#!/usr/bin/env python """ stream changes in mysql (on the torrents and statistics table) into elasticsearch as they happen on the binlog. This keeps elasticsearch in sync with whatever you do to the database, including stuff like admin queries. Also, because mysql keeps the binlog around for N days before deleting old stuff, you can survive a hiccup of elasticsearch or this script dying and pick up where you left off. For that "picking up" part, this script depends on one piece of external state: its last known binlog filename and position. This is saved off as a JSON file to a configurable location on the filesystem periodically. If the file is not present then you can initialize it with the values from `SHOW MASTER STATUS` from the mysql repl, which will start the sync from current state. In the case of catastrophic elasticsearch meltdown where you need to reconstruct the index, you'll want to be a bit careful with coordinating sync_es and import_to_es scripts. If you run import_to_es first than run sync_es against SHOW MASTER STATUS, anything that changed the database between when import_to_es and sync_es will be lost. Instead, you can run SHOW MASTER STATUS _before_ you run import_to_es. That way you'll definitely pick up any changes that happen while the import_to_es script is dumping stuff from the database into es, at the expense of redoing a (small) amount of indexing. """ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent from datetime import datetime from nyaa.models import TorrentFlags import sys import json import time import logging from statsd import StatsClient logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s') log = logging.getLogger('sync_es') log.setLevel(logging.INFO) # goes to netdata or other statsd listener stats = StatsClient('localhost', 8125, prefix="sync_es") #logging.getLogger('elasticsearch').setLevel(logging.DEBUG) # in prod want in /var/lib somewhere probably SAVE_LOC = "/var/lib/sync_es_position.json" MYSQL_HOST = '127.0.0.1' MYSQL_PORT = 3306 MYSQL_USER = 'test' MYSQL_PW = 'test123' NT_DB = 'nyaav2' with open(SAVE_LOC) as f: pos = json.load(f) es = Elasticsearch(timeout=30) stream = BinLogStreamReader( # TODO parse out from config.py or something connection_settings = { 'host': MYSQL_HOST, 'port': MYSQL_PORT, 'user': MYSQL_USER, 'passwd': MYSQL_PW }, server_id=10, # arbitrary # only care about this database currently only_schemas=[NT_DB], # these tables in the database only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"], # from our save file resume_stream=True, log_file=pos['log_file'], log_pos=pos['log_pos'], # skip the other stuff like table mapping only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent], # if we're at the head of the log, block until something happens # note it'd be nice to block async-style instead, but the mainline # binlogreader is synchronous. there is an (unmaintained?) fork # using aiomysql if anybody wants to revive that. blocking=True) def reindex_torrent(t, index_name): # XXX annoyingly different from import_to_es, and # you need to keep them in sync manually. f = t['flags'] doc = { "id": t['id'], "display_name": t['display_name'], "created_time": t['created_time'], "updated_time": t['updated_time'], "description": t['description'], # not analyzed but included so we can render magnet links # without querying sql again. "info_hash": t['info_hash'].hex(), "filesize": t['filesize'], "uploader_id": t['uploader_id'], "main_category_id": t['main_category_id'], "sub_category_id": t['sub_category_id'], # XXX all the bitflags are numbers "anonymous": bool(f & TorrentFlags.ANONYMOUS), "trusted": bool(f & TorrentFlags.TRUSTED), "remake": bool(f & TorrentFlags.REMAKE), "complete": bool(f & TorrentFlags.COMPLETE), # TODO instead of indexing and filtering later # could delete from es entirely. Probably won't matter # for at least a few months. "hidden": bool(f & TorrentFlags.HIDDEN), "deleted": bool(f & TorrentFlags.DELETED), "has_torrent": bool(t['has_torrent']), } # update, so we don't delete the stats if present return { '_op_type': 'update', '_index': index_name, '_type': 'torrent', '_id': str(t['id']), "doc": doc, "doc_as_upsert": True } def reindex_stats(s, index_name): # update the torrent at torrent_id, assumed to exist; # this will always be the case if you're reading the binlog # in order; the foreign key constraint on torrent_id prevents # the stats row from existing if the torrent isn't around. return { '_op_type': 'update', '_index': index_name, '_type': 'torrent', '_id': str(s['torrent_id']), "doc": { "stats_last_updated": s["last_updated"], "download_count": s["download_count"], "leech_count": s['leech_count'], "seed_count": s['seed_count'], }} def delet_this(row, index_name): return { "_op_type": 'delete', '_index': index_name, '_type': 'torrent', '_id': str(row['values']['id'])} last_save = time.time() since_last = 0 # get POST latency @stats.timer('post_bulk') def timed_bulk(events, **kwargs): bulk(es, events, **kwargs) log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") for event in stream: with stats.pipeline() as s: s.incr('total_events') s.incr(f"event.{event.table}.{type(event).__name__}") s.incr('total_rows', len(event.rows)) s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) # XXX not a "timer", but we get a histogram out of it s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": if event.table == "nyaa_torrents": index_name = "nyaa" else: index_name = "sukebei" if type(event) is WriteRowsEvent: timed_bulk(reindex_torrent(row['values'], index_name) for row in event.rows) elif type(event) is UpdateRowsEvent: # UpdateRowsEvent includes the old values too, but we don't care timed_bulk(reindex_torrent(row['after_values'], index_name) for row in event.rows) elif type(event) is DeleteRowsEvent: # ok, bye timed_bulk(delet_this(row, index_name) for row in event.rows) else: raise Exception(f"unknown event {type(event)}") elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": if event.table == "nyaa_statistics": index_name = "nyaa" else: index_name = "sukebei" if type(event) is WriteRowsEvent: timed_bulk( (reindex_stats(row['values'], index_name) for row in event.rows), # statistics updates are pretty low priority, plus in certain # cases where we're really out of sync, the torrent could be missing, # causing a "document missing" error from es, with no way to suppress # that server-side. Thus ignore all errors, oh well. raise_on_error=False, stats_only=True) elif type(event) is UpdateRowsEvent: timed_bulk( (reindex_stats(row['after_values'], index_name) for row in event.rows), raise_on_error=False, stats_only=True) elif type(event) is DeleteRowsEvent: # uh ok. Assume that the torrent row will get deleted later, # which will clean up the entire es "torrent" document pass else: raise Exception(f"unknown event {type(event)}") else: raise Exception(f"unknown table {s.table}") # how far we're behind, wall clock stats.gauge('process_latency', int((time.time() - event.timestamp) * 1000)) since_last += 1 if since_last >= 10000 or (time.time() - last_save) > 10: log.info(f"saving position {stream.log_file}/{stream.log_pos}, {time.time() - event.timestamp:,.3} seconds behind") with stats.timer('save_pos'): with open(SAVE_LOC, 'w') as f: json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) last_save = time.time() since_last = 0