From 33852a55bf6daf638cba0c084d9883a5ad22790e Mon Sep 17 00:00:00 2001 From: queue Date: Sun, 28 May 2017 19:46:38 -0600 Subject: [PATCH 1/2] sync_es: die when killed --- sync_es.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/sync_es.py b/sync_es.py index 2f2bb09..f52ca2b 100755 --- a/sync_es.py +++ b/sync_es.py @@ -24,6 +24,10 @@ database into es, at the expense of redoing a (small) amount of indexing. This uses multithreading so we don't have to block on socket io (both binlog reading and es POSTing). asyncio soon™ + +This script will exit on any sort of exception, so you'll want to use your +supervisor's restart functionality, e.g. Restart=failure in systemd, or +the poor man's `while true; do sync_es.py; sleep 1; done` in tmux. """ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, BulkIndexError @@ -133,14 +137,31 @@ def delet_this(row, index_name): '_type': 'torrent', '_id': str(row['values']['id'])} +# we could try to make this script robust to errors from es or mysql, but since +# the only thing we can do is "clear state and retry", it's easier to leave +# this to the supervisor. If we we carrying around heavier state in-process, +# it'd be more worth it to handle errors ourselves. +# +# Apparently there's no setDefaultUncaughtExceptionHandler in threading, and +# sys.excepthook is also broken, so this gives us the same +# exit-if-anything-happens semantics. +class ExitingThread(Thread): + def run(self): + try: + self.run_happy() + except: + log.exception("something happened") + # sys.exit only exits the thread, lame + import os + os._exit(1) -class BinlogReader(Thread): +class BinlogReader(ExitingThread): # write_buf is the Queue we communicate with def __init__(self, write_buf): Thread.__init__(self) self.write_buf = write_buf - def run(self): + def run_happy(self): with open(SAVE_LOC) as f: pos = json.load(f) @@ -229,7 +250,7 @@ class BinlogReader(Thread): else: raise Exception(f"unknown table {s.table}") -class EsPoster(Thread): +class EsPoster(ExitingThread): # read_buf is the queue of stuff to bulk post def __init__(self, read_buf, chunk_size=1000, flush_interval=5): Thread.__init__(self) @@ -237,7 +258,7 @@ class EsPoster(Thread): self.chunk_size = chunk_size self.flush_interval = flush_interval - def run(self): + def run_happy(self): es = Elasticsearch(timeout=30) last_save = time.time() From eceb8824dc73a7300138073c8fa4792b6f688b76 Mon Sep 17 00:00:00 2001 From: queue Date: Sun, 28 May 2017 20:14:14 -0600 Subject: [PATCH 2/2] sync_es: fix flush_interval behavior during slow times instead of flushing every N seconds, it flushed N seconds after the last change, which could drag out to N seconds * M batch size if there are few updates. Practically this doesn't change anything since stuff is always happening. Also fix not writing a save point if nothing is happening. Also practically does nothing, but for correctness. --- sync_es.py | 65 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/sync_es.py b/sync_es.py index f52ca2b..444c905 100755 --- a/sync_es.py +++ b/sync_es.py @@ -263,54 +263,67 @@ class EsPoster(ExitingThread): last_save = time.time() since_last = 0 + # XXX keep track of last posted position for save points, awkward + posted_log_file = None + posted_log_pos = None while True: actions = [] - while len(actions) < self.chunk_size: + now = time.time() + # wait up to flush_interval seconds after starting the batch + deadline = now + self.flush_interval + while len(actions) < self.chunk_size and now < deadline: + timeout = deadline - now try: # grab next event from queue with metadata that creepily # updates, surviving outside the scope of the loop ((log_file, log_pos, timestamp), action) = \ - self.read_buf.get(block=True, timeout=self.flush_interval) + self.read_buf.get(block=True, timeout=timeout) actions.append(action) + now = time.time() except Empty: # nothing new for the whole interval break - if not actions: - # nothing to post - log.debug("no changes...") - continue + if actions: + # XXX "time" to get histogram of no events per bulk + stats.timing('actions_per_bulk', len(actions)) - # XXX "time" to get histogram of no events per bulk - stats.timing('actions_per_bulk', len(actions)) - - try: - with stats.timer('post_bulk'): - bulk(es, actions, chunk_size=self.chunk_size) - except BulkIndexError as bie: - # in certain cases where we're really out of sync, we update a - # stat when the torrent doc is, causing a "document missing" - # error from es, with no way to suppress that server-side. - # Thus ignore that type of error if it's the only problem - for e in bie.errors: - try: - if e['update']['error']['type'] != 'document_missing_exception': + try: + with stats.timer('post_bulk'): + bulk(es, actions, chunk_size=self.chunk_size) + except BulkIndexError as bie: + # in certain cases where we're really out of sync, we update a + # stat when the torrent doc is, causing a "document missing" + # error from es, with no way to suppress that server-side. + # Thus ignore that type of error if it's the only problem + for e in bie.errors: + try: + if e['update']['error']['type'] != 'document_missing_exception': + raise bie + except KeyError: raise bie - except KeyError: - raise bie - # how far we're behind, wall clock - stats.gauge('process_latency', int((time.time() - timestamp) * 1000)) + # how far we've gotten in the actual log + posted_log_file = log_file + posted_log_pos = log_pos + + # how far we're behind, wall clock + stats.gauge('process_latency', int((time.time() - timestamp) * 1000)) + else: + log.debug("no changes...") since_last += len(actions) - if since_last >= 10000 or (time.time() - last_save) > 10: + # TODO instead of this manual timeout loop, could move this to another queue/thread + if posted_log_file is not None and (since_last >= 10000 or (time.time() - last_save) > 10): log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind") with stats.timer('save_pos'): with open(SAVE_LOC, 'w') as f: - json.dump({"log_file": log_file, "log_pos": log_pos}, f) + json.dump({"log_file": posted_log_file, "log_pos": posted_log_pos}, f) last_save = time.time() since_last = 0 + posted_log_file = None + posted_log_pos = None # in-memory queue between binlog and es. The bigger it is, the more events we # can parse in memory while waiting for es to catch up, at the expense of heap.