From ea2160a49d87fd64ce7097607667167eb104aa2a Mon Sep 17 00:00:00 2001 From: queue Date: Sun, 21 May 2017 00:55:19 -0600 Subject: [PATCH] sync_es: move io to separate threads, config json throughput is definitely massively improved, testing locally. hopefully it'll be enough. config moved a separate file by ops request. lazy lazy --- config_es_sync.json | 11 ++ sync_es.py | 290 +++++++++++++++++++++++++++++--------------- 2 files changed, 201 insertions(+), 100 deletions(-) create mode 100644 config_es_sync.json diff --git a/config_es_sync.json b/config_es_sync.json new file mode 100644 index 0000000..d2cb889 --- /dev/null +++ b/config_es_sync.json @@ -0,0 +1,11 @@ +{ +"save_loc": "/tmp/pos.json", +"mysql_host": "127.0.0.1", +"mysql_port": 13306, +"mysql_user": "root", +"mysql_password": "dunnolol", +"database": "nyaav2", +"internal_queue_depth": 10000, +"es_chunk_size": 10000, +"flush_interval": 5 +} diff --git a/sync_es.py b/sync_es.py index d0e9be5..cafafb5 100644 --- a/sync_es.py +++ b/sync_es.py @@ -21,9 +21,12 @@ when import_to_es and sync_es will be lost. Instead, you can run SHOW MASTER STATUS _before_ you run import_to_es. That way you'll definitely pick up any changes that happen while the import_to_es script is dumping stuff from the database into es, at the expense of redoing a (small) amount of indexing. + +This uses multithreading so we don't have to block on socket io (both binlog +reading and es POSTing). asyncio soon™ """ from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk +from elasticsearch.helpers import bulk, BulkIndexError from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent from datetime import datetime @@ -33,54 +36,38 @@ import json import time import logging from statsd import StatsClient +from threading import Thread +from queue import Queue, Empty logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s') log = logging.getLogger('sync_es') log.setLevel(logging.INFO) +# config in json, 2lazy to argparse +if len(sys.argv) != 2: + print("need config.json location", file=sys.stderr) + sys.exit(-1) +with open(sys.argv[1]) as f: + config = json.load(f) + # goes to netdata or other statsd listener stats = StatsClient('localhost', 8125, prefix="sync_es") #logging.getLogger('elasticsearch').setLevel(logging.DEBUG) # in prod want in /var/lib somewhere probably -SAVE_LOC = "/var/lib/sync_es_position.json" -MYSQL_HOST = '127.0.0.1' -MYSQL_PORT = 3306 -MYSQL_USER = 'test' -MYSQL_PW = 'test123' -NT_DB = 'nyaav2' - -with open(SAVE_LOC) as f: - pos = json.load(f) - -es = Elasticsearch(timeout=30) - -stream = BinLogStreamReader( - # TODO parse out from config.py or something - connection_settings = { - 'host': MYSQL_HOST, - 'port': MYSQL_PORT, - 'user': MYSQL_USER, - 'passwd': MYSQL_PW - }, - server_id=10, # arbitrary - # only care about this database currently - only_schemas=[NT_DB], - # these tables in the database - only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"], - # from our save file - resume_stream=True, - log_file=pos['log_file'], - log_pos=pos['log_pos'], - # skip the other stuff like table mapping - only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent], - # if we're at the head of the log, block until something happens - # note it'd be nice to block async-style instead, but the mainline - # binlogreader is synchronous. there is an (unmaintained?) fork - # using aiomysql if anybody wants to revive that. - blocking=True) +SAVE_LOC = config.get('save_loc', "/tmp/pos.json") +MYSQL_HOST = config.get('mysql_host', '127.0.0.1') +MYSQL_PORT = config.get('mysql_port', 3306) +MYSQL_USER = config.get('mysql_user', 'root') +MYSQL_PW = config.get('mysql_password', 'dunnolol') +NT_DB = config.get('database', 'nyaav2') +INTERNAL_QUEUE_DEPTH = config.get('internal_queue_depth', 10000) +ES_CHUNK_SIZE = config.get('es_chunk_size', 10000) +# seconds since no events happening to flush to es. remember this also +# interacts with es' refresh_interval setting. +FLUSH_INTERVAL = config.get('flush_interval', 5) def reindex_torrent(t, index_name): # XXX annoyingly different from import_to_es, and @@ -145,73 +132,176 @@ def delet_this(row, index_name): '_type': 'torrent', '_id': str(row['values']['id'])} -last_save = time.time() -since_last = 0 -# get POST latency -@stats.timer('post_bulk') -def timed_bulk(events, **kwargs): - bulk(es, events, **kwargs) +class BinlogReader(Thread): + # write_buf is the Queue we communicate with + def __init__(self, write_buf): + Thread.__init__(self) + self.write_buf = write_buf -log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") + def run(self): + with open(SAVE_LOC) as f: + pos = json.load(f) -for event in stream: - with stats.pipeline() as s: - s.incr('total_events') - s.incr(f"event.{event.table}.{type(event).__name__}") - s.incr('total_rows', len(event.rows)) - s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) - # XXX not a "timer", but we get a histogram out of it - s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) - if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": - if event.table == "nyaa_torrents": - index_name = "nyaa" - else: - index_name = "sukebei" - if type(event) is WriteRowsEvent: - timed_bulk(reindex_torrent(row['values'], index_name) for row in event.rows) - elif type(event) is UpdateRowsEvent: - # UpdateRowsEvent includes the old values too, but we don't care - timed_bulk(reindex_torrent(row['after_values'], index_name) for row in event.rows) - elif type(event) is DeleteRowsEvent: - # ok, bye - timed_bulk(delet_this(row, index_name) for row in event.rows) - else: - raise Exception(f"unknown event {type(event)}") - elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": - if event.table == "nyaa_statistics": - index_name = "nyaa" - else: - index_name = "sukebei" - if type(event) is WriteRowsEvent: - timed_bulk( - (reindex_stats(row['values'], index_name) for row in event.rows), - # statistics updates are pretty low priority, plus in certain - # cases where we're really out of sync, the torrent could be missing, - # causing a "document missing" error from es, with no way to suppress - # that server-side. Thus ignore all errors, oh well. - raise_on_error=False, stats_only=True) - elif type(event) is UpdateRowsEvent: - timed_bulk( - (reindex_stats(row['after_values'], index_name) for row in event.rows), - raise_on_error=False, stats_only=True) - elif type(event) is DeleteRowsEvent: - # uh ok. Assume that the torrent row will get deleted later, - # which will clean up the entire es "torrent" document - pass - else: - raise Exception(f"unknown event {type(event)}") - else: - raise Exception(f"unknown table {s.table}") + stream = BinLogStreamReader( + # TODO parse out from config.py or something + connection_settings = { + 'host': MYSQL_HOST, + 'port': MYSQL_PORT, + 'user': MYSQL_USER, + 'passwd': MYSQL_PW + }, + server_id=10, # arbitrary + # only care about this database currently + only_schemas=[NT_DB], + # these tables in the database + only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"], + # from our save file + resume_stream=True, + log_file=pos['log_file'], + log_pos=pos['log_pos'], + # skip the other stuff like table mapping + only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent], + # if we're at the head of the log, block until something happens + # note it'd be nice to block async-style instead, but the mainline + # binlogreader is synchronous. there is an (unmaintained?) fork + # using aiomysql if anybody wants to revive that. + blocking=True) - # how far we're behind, wall clock - stats.gauge('process_latency', int((time.time() - event.timestamp) * 1000)) + log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") + + for event in stream: + # save the pos of the stream and timestamp with each message, so we + # can commit in the other thread. and keep track of process latency + pos = (stream.log_file, stream.log_pos, event.timestamp) + with stats.pipeline() as s: + s.incr('total_events') + s.incr(f"event.{event.table}.{type(event).__name__}") + s.incr('total_rows', len(event.rows)) + s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) + # XXX not a "timer", but we get a histogram out of it + s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) + + if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": + if event.table == "nyaa_torrents": + index_name = "nyaa" + else: + index_name = "sukebei" + if type(event) is WriteRowsEvent: + for row in event.rows: + self.write_buf.put( + (pos, reindex_torrent(row['values'], index_name)), + block=True) + elif type(event) is UpdateRowsEvent: + # UpdateRowsEvent includes the old values too, but we don't care + for row in event.rows: + self.write_buf.put( + (pos, reindex_torrent(row['after_values'], index_name)), + block=True) + elif type(event) is DeleteRowsEvent: + # ok, bye + for row in event.rows: + self.write_buf.put((pos, delet_this(row, index_name)), block=True) + else: + raise Exception(f"unknown event {type(event)}") + elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": + if event.table == "nyaa_statistics": + index_name = "nyaa" + else: + index_name = "sukebei" + if type(event) is WriteRowsEvent: + for row in event.rows: + self.write_buf.put( + (pos, reindex_stats(row['values'], index_name)), + block=True) + elif type(event) is UpdateRowsEvent: + for row in event.rows: + self.write_buf.put( + (pos, reindex_stats(row['after_values'], index_name)), + block=True) + elif type(event) is DeleteRowsEvent: + # uh ok. Assume that the torrent row will get deleted later, + # which will clean up the entire es "torrent" document + pass + else: + raise Exception(f"unknown event {type(event)}") + else: + raise Exception(f"unknown table {s.table}") + +class EsPoster(Thread): + # read_buf is the queue of stuff to bulk post + def __init__(self, read_buf, chunk_size=1000, flush_interval=5): + Thread.__init__(self) + self.read_buf = read_buf + self.chunk_size = chunk_size + self.flush_interval = flush_interval + + def run(self): + es = Elasticsearch(timeout=30) - since_last += 1 - if since_last >= 10000 or (time.time() - last_save) > 10: - log.info(f"saving position {stream.log_file}/{stream.log_pos}, {time.time() - event.timestamp:,.3} seconds behind") - with stats.timer('save_pos'): - with open(SAVE_LOC, 'w') as f: - json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) last_save = time.time() since_last = 0 + + while True: + actions = [] + while len(actions) < self.chunk_size: + try: + # grab next event from queue with metadata that creepily + # updates, surviving outside the scope of the loop + ((log_file, log_pos, timestamp), action) = \ + self.read_buf.get(block=True, timeout=self.flush_interval) + actions.append(action) + except Empty: + # nothing new for the whole interval + break + + if not actions: + # nothing to post + log.debug("no changes...") + continue + + # XXX "time" to get histogram of no events per bulk + stats.timing('actions_per_bulk', len(actions)) + + try: + with stats.timer('post_bulk'): + bulk(es, actions, chunk_size=self.chunk_size) + except BulkIndexError as bie: + # in certain cases where we're really out of sync, we update a + # stat when the torrent doc is, causing a "document missing" + # error from es, with no way to suppress that server-side. + # Thus ignore that type of error if it's the only problem + for e in bie.errors: + try: + if e['update']['error']['type'] != 'document_missing_exception': + raise bie + except KeyError: + raise bie + + # how far we're behind, wall clock + stats.gauge('process_latency', int((time.time() - timestamp) * 1000)) + + since_last += len(actions) + if since_last >= 10000 or (time.time() - last_save) > 10: + log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind") + with stats.timer('save_pos'): + with open(SAVE_LOC, 'w') as f: + json.dump({"log_file": log_file, "log_pos": log_pos}, f) + last_save = time.time() + since_last = 0 + +# in-memory queue between binlog and es. The bigger it is, the more events we +# can parse in memory while waiting for es to catch up, at the expense of heap. +buf = Queue(maxsize=INTERNAL_QUEUE_DEPTH) + +reader = BinlogReader(buf) +reader.daemon = True +writer = EsPoster(buf, chunk_size=ES_CHUNK_SIZE, flush_interval=FLUSH_INTERVAL) +writer.daemon = True +reader.start() +writer.start() + +# on the main thread, poll the queue size for monitoring +while True: + stats.gauge('queue_depth', buf.qsize()) + time.sleep(1)