diff --git a/sync_es.py b/sync_es.py index 4cbd9f2..d0e9be5 100644 --- a/sync_es.py +++ b/sync_es.py @@ -32,12 +32,16 @@ import sys import json import time import logging +from statsd import StatsClient -logging.basicConfig() +logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s') log = logging.getLogger('sync_es') log.setLevel(logging.INFO) +# goes to netdata or other statsd listener +stats = StatsClient('localhost', 8125, prefix="sync_es") + #logging.getLogger('elasticsearch').setLevel(logging.DEBUG) # in prod want in /var/lib somewhere probably @@ -120,8 +124,8 @@ def reindex_torrent(t, index_name): def reindex_stats(s, index_name): # update the torrent at torrent_id, assumed to exist; # this will always be the case if you're reading the binlog - # in order; the foreign key constraint on torrrent_id prevents - # the stats row rom existing if the torrent isn't around. + # in order; the foreign key constraint on torrent_id prevents + # the stats row from existing if the torrent isn't around. return { '_op_type': 'update', '_index': index_name, @@ -141,23 +145,37 @@ def delet_this(row, index_name): '_type': 'torrent', '_id': str(row['values']['id'])} -n = 0 last_save = time.time() +since_last = 0 + +# get POST latency +@stats.timer('post_bulk') +def timed_bulk(events, **kwargs): + bulk(es, events, **kwargs) + +log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}") for event in stream: + with stats.pipeline() as s: + s.incr('total_events') + s.incr(f"event.{event.table}.{type(event).__name__}") + s.incr('total_rows', len(event.rows)) + s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows)) + # XXX not a "timer", but we get a histogram out of it + s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows)) if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": if event.table == "nyaa_torrents": index_name = "nyaa" else: index_name = "sukebei" if type(event) is WriteRowsEvent: - bulk(es, (reindex_torrent(row['values'], index_name) for row in event.rows)) + timed_bulk(reindex_torrent(row['values'], index_name) for row in event.rows) elif type(event) is UpdateRowsEvent: # UpdateRowsEvent includes the old values too, but we don't care - bulk(es, (reindex_torrent(row['after_values'], index_name) for row in event.rows)) + timed_bulk(reindex_torrent(row['after_values'], index_name) for row in event.rows) elif type(event) is DeleteRowsEvent: # ok, bye - bulk(es, (delet_this(row, index_name) for row in event.rows)) + timed_bulk(delet_this(row, index_name) for row in event.rows) else: raise Exception(f"unknown event {type(event)}") elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": @@ -166,11 +184,19 @@ for event in stream: else: index_name = "sukebei" if type(event) is WriteRowsEvent: - bulk(es, (reindex_stats(row['values'], index_name) for row in event.rows)) + timed_bulk( + (reindex_stats(row['values'], index_name) for row in event.rows), + # statistics updates are pretty low priority, plus in certain + # cases where we're really out of sync, the torrent could be missing, + # causing a "document missing" error from es, with no way to suppress + # that server-side. Thus ignore all errors, oh well. + raise_on_error=False, stats_only=True) elif type(event) is UpdateRowsEvent: - bulk(es, (reindex_stats(row['after_values'], index_name) for row in event.rows)) + timed_bulk( + (reindex_stats(row['after_values'], index_name) for row in event.rows), + raise_on_error=False, stats_only=True) elif type(event) is DeleteRowsEvent: - # uh ok. assume that the torrent row will get deleted later, + # uh ok. Assume that the torrent row will get deleted later, # which will clean up the entire es "torrent" document pass else: @@ -178,8 +204,14 @@ for event in stream: else: raise Exception(f"unknown table {s.table}") - n += 1 - if n % 100 == 0 or time.time() - last_save > 30: - log.info(f"saving position {stream.log_file}/{stream.log_pos}") - with open(SAVE_LOC, 'w') as f: - json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) + # how far we're behind, wall clock + stats.gauge('process_latency', int((time.time() - event.timestamp) * 1000)) + + since_last += 1 + if since_last >= 10000 or (time.time() - last_save) > 10: + log.info(f"saving position {stream.log_file}/{stream.log_pos}, {time.time() - event.timestamp:,.3} seconds behind") + with stats.timer('save_pos'): + with open(SAVE_LOC, 'w') as f: + json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f) + last_save = time.time() + since_last = 0