From 6a4ad827c10cb3327b4d7b83c2cdc8bf916b2f03 Mon Sep 17 00:00:00 2001 From: queue Date: Sat, 20 May 2017 23:19:35 -0600 Subject: [PATCH 1/2] sync_es: instrument with statsd, improve logging also fixed the save time loop and spaced it out to 10k events instead of 100. Notably, the event no. of rows caps out at around 5 by default because of default -binlog-row-event-max-size=8192 in mysql; that's how many (torrent) rows fit into a single event. We could increase that, but instead I think it's finally time to finally multithread this thing; both the binlog read and the ES POST shouldn't use the GIL so it'll actually work. --- sync_es.py | 62 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/sync_es.py b/sync_es.py index 4cbd9f2..d0e9be5 100644 --- a/sync_es.py +++ b/sync_es.py @@ -32,12 +32,16 @@ import sys import json import time import logging +from statsd import StatsClient -logging.basicConfig() +logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s') log = logging.getLogger('sync_es') log.setLevel(logging.INFO) +# goes to netdata or other statsd listener +stats = StatsClient('localhost', 8125, prefix="sync_es") + #logging.getLogger('elasticsearch').setLevel(logging.DEBUG) # in prod want in /var/lib somewhere probably @@ -120,8 +124,8 @@ def reindex_torrent(t, index_name): def reindex_stats(s, index_name): # update the torrent at torrent_id, assumed to exist; # this will always be the case if you're reading the binlog - # in order; the foreign key constraint on torrrent_id prevents - # the stats row rom existing if the torrent isn't around. + # in order; the foreign key constraint on torrent_id prevents + # the stats row from existing if the torrent isn't around. return { '_op_type': 'update', '_index': index_name, @@ -141,23 +145,37 @@ def delet_this(row, index_name): '_type': 'torrent', '_id': str(row['values']['id'])} -n = 0 last_save = time.time() +since_last = 0 + +# get POST latency +@stats.timer('post_bulk') +def timed_bulk(events, **kwargs): + bulk(es, events, **kwargs) + +log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") for event in stream: + with stats.pipeline() as s: + s.incr('total_events') + s.incr(f"event.{event.table}.{type(event).__name__}") + s.incr('total_rows', len(event.rows)) + s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) + # XXX not a "timer", but we get a histogram out of it + s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": if event.table == "nyaa_torrents": index_name = "nyaa" else: index_name = "sukebei" if type(event) is WriteRowsEvent: - bulk(es, (reindex_torrent(row['values'], index_name) for row in event.rows)) + timed_bulk(reindex_torrent(row['values'], index_name) for row in event.rows) elif type(event) is UpdateRowsEvent: # UpdateRowsEvent includes the old values too, but we don't care - bulk(es, (reindex_torrent(row['after_values'], index_name) for row in event.rows)) + timed_bulk(reindex_torrent(row['after_values'], index_name) for row in event.rows) elif type(event) is DeleteRowsEvent: # ok, bye - bulk(es, (delet_this(row, index_name) for row in event.rows)) + timed_bulk(delet_this(row, index_name) for row in event.rows) else: raise Exception(f"unknown event {type(event)}") elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": @@ -166,11 +184,19 @@ for event in stream: else: index_name = "sukebei" if type(event) is WriteRowsEvent: - bulk(es, (reindex_stats(row['values'], index_name) for row in event.rows)) + timed_bulk( + (reindex_stats(row['values'], index_name) for row in event.rows), + # statistics updates are pretty low priority, plus in certain + # cases where we're really out of sync, the torrent could be missing, + # causing a "document missing" error from es, with no way to suppress + # that server-side. Thus ignore all errors, oh well. + raise_on_error=False, stats_only=True) elif type(event) is UpdateRowsEvent: - bulk(es, (reindex_stats(row['after_values'], index_name) for row in event.rows)) + timed_bulk( + (reindex_stats(row['after_values'], index_name) for row in event.rows), + raise_on_error=False, stats_only=True) elif type(event) is DeleteRowsEvent: - # uh ok. assume that the torrent row will get deleted later, + # uh ok. Assume that the torrent row will get deleted later, # which will clean up the entire es "torrent" document pass else: @@ -178,8 +204,14 @@ for event in stream: else: raise Exception(f"unknown table {s.table}") - n += 1 - if n % 100 == 0 or time.time() - last_save > 30: - log.info(f"saving position {stream.log_file}/{stream.log_pos}") - with open(SAVE_LOC, 'w') as f: - json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) + # how far we're behind, wall clock + stats.gauge('process_latency', int((time.time() - event.timestamp) * 1000)) + + since_last += 1 + if since_last >= 10000 or (time.time() - last_save) > 10: + log.info(f"saving position {stream.log_file}/{stream.log_pos}, {time.time() - event.timestamp:,.3} seconds behind") + with stats.timer('save_pos'): + with open(SAVE_LOC, 'w') as f: + json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) + last_save = time.time() + since_last = 0 From ea2160a49d87fd64ce7097607667167eb104aa2a Mon Sep 17 00:00:00 2001 From: queue Date: Sun, 21 May 2017 00:55:19 -0600 Subject: [PATCH 2/2] sync_es: move io to separate threads, config json throughput is definitely massively improved, testing locally. hopefully it'll be enough. config moved a separate file by ops request. lazy lazy --- config_es_sync.json | 11 ++ sync_es.py | 290 +++++++++++++++++++++++++++++--------------- 2 files changed, 201 insertions(+), 100 deletions(-) create mode 100644 config_es_sync.json diff --git a/config_es_sync.json b/config_es_sync.json new file mode 100644 index 0000000..d2cb889 --- /dev/null +++ b/config_es_sync.json @@ -0,0 +1,11 @@ +{ +"save_loc": "/tmp/pos.json", +"mysql_host": "127.0.0.1", +"mysql_port": 13306, +"mysql_user": "root", +"mysql_password": "dunnolol", +"database": "nyaav2", +"internal_queue_depth": 10000, +"es_chunk_size": 10000, +"flush_interval": 5 +} diff --git a/sync_es.py b/sync_es.py index d0e9be5..cafafb5 100644 --- a/sync_es.py +++ b/sync_es.py @@ -21,9 +21,12 @@ when import_to_es and sync_es will be lost. Instead, you can run SHOW MASTER STATUS _before_ you run import_to_es. That way you'll definitely pick up any changes that happen while the import_to_es script is dumping stuff from the database into es, at the expense of redoing a (small) amount of indexing. + +This uses multithreading so we don't have to block on socket io (both binlog +reading and es POSTing). asyncio soon™ """ from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk +from elasticsearch.helpers import bulk, BulkIndexError from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent from datetime import datetime @@ -33,54 +36,38 @@ import json import time import logging from statsd import StatsClient +from threading import Thread +from queue import Queue, Empty logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s') log = logging.getLogger('sync_es') log.setLevel(logging.INFO) +# config in json, 2lazy to argparse +if len(sys.argv) != 2: + print("need config.json location", file=sys.stderr) + sys.exit(-1) +with open(sys.argv[1]) as f: + config = json.load(f) + # goes to netdata or other statsd listener stats = StatsClient('localhost', 8125, prefix="sync_es") #logging.getLogger('elasticsearch').setLevel(logging.DEBUG) # in prod want in /var/lib somewhere probably -SAVE_LOC = "/var/lib/sync_es_position.json" -MYSQL_HOST = '127.0.0.1' -MYSQL_PORT = 3306 -MYSQL_USER = 'test' -MYSQL_PW = 'test123' -NT_DB = 'nyaav2' - -with open(SAVE_LOC) as f: - pos = json.load(f) - -es = Elasticsearch(timeout=30) - -stream = BinLogStreamReader( - # TODO parse out from config.py or something - connection_settings = { - 'host': MYSQL_HOST, - 'port': MYSQL_PORT, - 'user': MYSQL_USER, - 'passwd': MYSQL_PW - }, - server_id=10, # arbitrary - # only care about this database currently - only_schemas=[NT_DB], - # these tables in the database - only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"], - # from our save file - resume_stream=True, - log_file=pos['log_file'], - log_pos=pos['log_pos'], - # skip the other stuff like table mapping - only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent], - # if we're at the head of the log, block until something happens - # note it'd be nice to block async-style instead, but the mainline - # binlogreader is synchronous. there is an (unmaintained?) fork - # using aiomysql if anybody wants to revive that. - blocking=True) +SAVE_LOC = config.get('save_loc', "/tmp/pos.json") +MYSQL_HOST = config.get('mysql_host', '127.0.0.1') +MYSQL_PORT = config.get('mysql_port', 3306) +MYSQL_USER = config.get('mysql_user', 'root') +MYSQL_PW = config.get('mysql_password', 'dunnolol') +NT_DB = config.get('database', 'nyaav2') +INTERNAL_QUEUE_DEPTH = config.get('internal_queue_depth', 10000) +ES_CHUNK_SIZE = config.get('es_chunk_size', 10000) +# seconds since no events happening to flush to es. remember this also +# interacts with es' refresh_interval setting. +FLUSH_INTERVAL = config.get('flush_interval', 5) def reindex_torrent(t, index_name): # XXX annoyingly different from import_to_es, and @@ -145,73 +132,176 @@ def delet_this(row, index_name): '_type': 'torrent', '_id': str(row['values']['id'])} -last_save = time.time() -since_last = 0 -# get POST latency -@stats.timer('post_bulk') -def timed_bulk(events, **kwargs): - bulk(es, events, **kwargs) +class BinlogReader(Thread): + # write_buf is the Queue we communicate with + def __init__(self, write_buf): + Thread.__init__(self) + self.write_buf = write_buf -log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") + def run(self): + with open(SAVE_LOC) as f: + pos = json.load(f) -for event in stream: - with stats.pipeline() as s: - s.incr('total_events') - s.incr(f"event.{event.table}.{type(event).__name__}") - s.incr('total_rows', len(event.rows)) - s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) - # XXX not a "timer", but we get a histogram out of it - s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) - if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": - if event.table == "nyaa_torrents": - index_name = "nyaa" - else: - index_name = "sukebei" - if type(event) is WriteRowsEvent: - timed_bulk(reindex_torrent(row['values'], index_name) for row in event.rows) - elif type(event) is UpdateRowsEvent: - # UpdateRowsEvent includes the old values too, but we don't care - timed_bulk(reindex_torrent(row['after_values'], index_name) for row in event.rows) - elif type(event) is DeleteRowsEvent: - # ok, bye - timed_bulk(delet_this(row, index_name) for row in event.rows) - else: - raise Exception(f"unknown event {type(event)}") - elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": - if event.table == "nyaa_statistics": - index_name = "nyaa" - else: - index_name = "sukebei" - if type(event) is WriteRowsEvent: - timed_bulk( - (reindex_stats(row['values'], index_name) for row in event.rows), - # statistics updates are pretty low priority, plus in certain - # cases where we're really out of sync, the torrent could be missing, - # causing a "document missing" error from es, with no way to suppress - # that server-side. Thus ignore all errors, oh well. - raise_on_error=False, stats_only=True) - elif type(event) is UpdateRowsEvent: - timed_bulk( - (reindex_stats(row['after_values'], index_name) for row in event.rows), - raise_on_error=False, stats_only=True) - elif type(event) is DeleteRowsEvent: - # uh ok. Assume that the torrent row will get deleted later, - # which will clean up the entire es "torrent" document - pass - else: - raise Exception(f"unknown event {type(event)}") - else: - raise Exception(f"unknown table {s.table}") + stream = BinLogStreamReader( + # TODO parse out from config.py or something + connection_settings = { + 'host': MYSQL_HOST, + 'port': MYSQL_PORT, + 'user': MYSQL_USER, + 'passwd': MYSQL_PW + }, + server_id=10, # arbitrary + # only care about this database currently + only_schemas=[NT_DB], + # these tables in the database + only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"], + # from our save file + resume_stream=True, + log_file=pos['log_file'], + log_pos=pos['log_pos'], + # skip the other stuff like table mapping + only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent], + # if we're at the head of the log, block until something happens + # note it'd be nice to block async-style instead, but the mainline + # binlogreader is synchronous. there is an (unmaintained?) fork + # using aiomysql if anybody wants to revive that. + blocking=True) - # how far we're behind, wall clock - stats.gauge('process_latency', int((time.time() - event.timestamp) * 1000)) + log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") + + for event in stream: + # save the pos of the stream and timestamp with each message, so we + # can commit in the other thread. and keep track of process latency + pos = (stream.log_file, stream.log_pos, event.timestamp) + with stats.pipeline() as s: + s.incr('total_events') + s.incr(f"event.{event.table}.{type(event).__name__}") + s.incr('total_rows', len(event.rows)) + s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) + # XXX not a "timer", but we get a histogram out of it + s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) + + if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": + if event.table == "nyaa_torrents": + index_name = "nyaa" + else: + index_name = "sukebei" + if type(event) is WriteRowsEvent: + for row in event.rows: + self.write_buf.put( + (pos, reindex_torrent(row['values'], index_name)), + block=True) + elif type(event) is UpdateRowsEvent: + # UpdateRowsEvent includes the old values too, but we don't care + for row in event.rows: + self.write_buf.put( + (pos, reindex_torrent(row['after_values'], index_name)), + block=True) + elif type(event) is DeleteRowsEvent: + # ok, bye + for row in event.rows: + self.write_buf.put((pos, delet_this(row, index_name)), block=True) + else: + raise Exception(f"unknown event {type(event)}") + elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": + if event.table == "nyaa_statistics": + index_name = "nyaa" + else: + index_name = "sukebei" + if type(event) is WriteRowsEvent: + for row in event.rows: + self.write_buf.put( + (pos, reindex_stats(row['values'], index_name)), + block=True) + elif type(event) is UpdateRowsEvent: + for row in event.rows: + self.write_buf.put( + (pos, reindex_stats(row['after_values'], index_name)), + block=True) + elif type(event) is DeleteRowsEvent: + # uh ok. Assume that the torrent row will get deleted later, + # which will clean up the entire es "torrent" document + pass + else: + raise Exception(f"unknown event {type(event)}") + else: + raise Exception(f"unknown table {s.table}") + +class EsPoster(Thread): + # read_buf is the queue of stuff to bulk post + def __init__(self, read_buf, chunk_size=1000, flush_interval=5): + Thread.__init__(self) + self.read_buf = read_buf + self.chunk_size = chunk_size + self.flush_interval = flush_interval + + def run(self): + es = Elasticsearch(timeout=30) - since_last += 1 - if since_last >= 10000 or (time.time() - last_save) > 10: - log.info(f"saving position {stream.log_file}/{stream.log_pos}, {time.time() - event.timestamp:,.3} seconds behind") - with stats.timer('save_pos'): - with open(SAVE_LOC, 'w') as f: - json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) last_save = time.time() since_last = 0 + + while True: + actions = [] + while len(actions) < self.chunk_size: + try: + # grab next event from queue with metadata that creepily + # updates, surviving outside the scope of the loop + ((log_file, log_pos, timestamp), action) = \ + self.read_buf.get(block=True, timeout=self.flush_interval) + actions.append(action) + except Empty: + # nothing new for the whole interval + break + + if not actions: + # nothing to post + log.debug("no changes...") + continue + + # XXX "time" to get histogram of no events per bulk + stats.timing('actions_per_bulk', len(actions)) + + try: + with stats.timer('post_bulk'): + bulk(es, actions, chunk_size=self.chunk_size) + except BulkIndexError as bie: + # in certain cases where we're really out of sync, we update a + # stat when the torrent doc is, causing a "document missing" + # error from es, with no way to suppress that server-side. + # Thus ignore that type of error if it's the only problem + for e in bie.errors: + try: + if e['update']['error']['type'] != 'document_missing_exception': + raise bie + except KeyError: + raise bie + + # how far we're behind, wall clock + stats.gauge('process_latency', int((time.time() - timestamp) * 1000)) + + since_last += len(actions) + if since_last >= 10000 or (time.time() - last_save) > 10: + log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind") + with stats.timer('save_pos'): + with open(SAVE_LOC, 'w') as f: + json.dump({"log_file": log_file, "log_pos": log_pos}, f) + last_save = time.time() + since_last = 0 + +# in-memory queue between binlog and es. The bigger it is, the more events we +# can parse in memory while waiting for es to catch up, at the expense of heap. +buf = Queue(maxsize=INTERNAL_QUEUE_DEPTH) + +reader = BinlogReader(buf) +reader.daemon = True +writer = EsPoster(buf, chunk_size=ES_CHUNK_SIZE, flush_interval=FLUSH_INTERVAL) +writer.daemon = True +reader.start() +writer.start() + +# on the main thread, poll the queue size for monitoring +while True: + stats.gauge('queue_depth', buf.qsize()) + time.sleep(1)