From 6a4ad827c10cb3327b4d7b83c2cdc8bf916b2f03 Mon Sep 17 00:00:00 2001 From: queue Date: Sat, 20 May 2017 23:19:35 -0600 Subject: [PATCH] sync_es: instrument with statsd, improve logging also fixed the save time loop and spaced it out to 10k events instead of 100. Notably, the event no. of rows caps out at around 5 by default because of default -binlog-row-event-max-size=8192 in mysql; that's how many (torrent) rows fit into a single event. We could increase that, but instead I think it's finally time to finally multithread this thing; both the binlog read and the ES POST shouldn't use the GIL so it'll actually work. --- sync_es.py | 62 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/sync_es.py b/sync_es.py index 4cbd9f2..d0e9be5 100644 --- a/sync_es.py +++ b/sync_es.py @@ -32,12 +32,16 @@ import sys import json import time import logging +from statsd import StatsClient -logging.basicConfig() +logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s') log = logging.getLogger('sync_es') log.setLevel(logging.INFO) +# goes to netdata or other statsd listener +stats = StatsClient('localhost', 8125, prefix="sync_es") + #logging.getLogger('elasticsearch').setLevel(logging.DEBUG) # in prod want in /var/lib somewhere probably @@ -120,8 +124,8 @@ def reindex_torrent(t, index_name): def reindex_stats(s, index_name): # update the torrent at torrent_id, assumed to exist; # this will always be the case if you're reading the binlog - # in order; the foreign key constraint on torrrent_id prevents - # the stats row rom existing if the torrent isn't around. + # in order; the foreign key constraint on torrent_id prevents + # the stats row from existing if the torrent isn't around. return { '_op_type': 'update', '_index': index_name, @@ -141,23 +145,37 @@ def delet_this(row, index_name): '_type': 'torrent', '_id': str(row['values']['id'])} -n = 0 last_save = time.time() +since_last = 0 + +# get POST latency +@stats.timer('post_bulk') +def timed_bulk(events, **kwargs): + bulk(es, events, **kwargs) + +log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") for event in stream: + with stats.pipeline() as s: + s.incr('total_events') + s.incr(f"event.{event.table}.{type(event).__name__}") + s.incr('total_rows', len(event.rows)) + s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) + # XXX not a "timer", but we get a histogram out of it + s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": if event.table == "nyaa_torrents": index_name = "nyaa" else: index_name = "sukebei" if type(event) is WriteRowsEvent: - bulk(es, (reindex_torrent(row['values'], index_name) for row in event.rows)) + timed_bulk(reindex_torrent(row['values'], index_name) for row in event.rows) elif type(event) is UpdateRowsEvent: # UpdateRowsEvent includes the old values too, but we don't care - bulk(es, (reindex_torrent(row['after_values'], index_name) for row in event.rows)) + timed_bulk(reindex_torrent(row['after_values'], index_name) for row in event.rows) elif type(event) is DeleteRowsEvent: # ok, bye - bulk(es, (delet_this(row, index_name) for row in event.rows)) + timed_bulk(delet_this(row, index_name) for row in event.rows) else: raise Exception(f"unknown event {type(event)}") elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": @@ -166,11 +184,19 @@ for event in stream: else: index_name = "sukebei" if type(event) is WriteRowsEvent: - bulk(es, (reindex_stats(row['values'], index_name) for row in event.rows)) + timed_bulk( + (reindex_stats(row['values'], index_name) for row in event.rows), + # statistics updates are pretty low priority, plus in certain + # cases where we're really out of sync, the torrent could be missing, + # causing a "document missing" error from es, with no way to suppress + # that server-side. Thus ignore all errors, oh well. + raise_on_error=False, stats_only=True) elif type(event) is UpdateRowsEvent: - bulk(es, (reindex_stats(row['after_values'], index_name) for row in event.rows)) + timed_bulk( + (reindex_stats(row['after_values'], index_name) for row in event.rows), + raise_on_error=False, stats_only=True) elif type(event) is DeleteRowsEvent: - # uh ok. assume that the torrent row will get deleted later, + # uh ok. Assume that the torrent row will get deleted later, # which will clean up the entire es "torrent" document pass else: @@ -178,8 +204,14 @@ for event in stream: else: raise Exception(f"unknown table {s.table}") - n += 1 - if n % 100 == 0 or time.time() - last_save > 30: - log.info(f"saving position {stream.log_file}/{stream.log_pos}") - with open(SAVE_LOC, 'w') as f: - json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) + # how far we're behind, wall clock + stats.gauge('process_latency', int((time.time() - event.timestamp) * 1000)) + + since_last += 1 + if since_last >= 10000 or (time.time() - last_save) > 10: + log.info(f"saving position {stream.log_file}/{stream.log_pos}, {time.time() - event.timestamp:,.3} seconds behind") + with stats.timer('save_pos'): + with open(SAVE_LOC, 'w') as f: + json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) + last_save = time.time() + since_last = 0