diff --git a/sync_es.py b/sync_es.py index 2f2bb09..f52ca2b 100755 --- a/sync_es.py +++ b/sync_es.py @@ -24,6 +24,10 @@ database into es, at the expense of redoing a (small) amount of indexing. This uses multithreading so we don't have to block on socket io (both binlog reading and es POSTing). asyncio soon™ + +This script will exit on any sort of exception, so you'll want to use your +supervisor's restart functionality, e.g. Restart=failure in systemd, or +the poor man's `while true; do sync_es.py; sleep 1; done` in tmux. """ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, BulkIndexError @@ -133,14 +137,31 @@ def delet_this(row, index_name): '_type': 'torrent', '_id': str(row['values']['id'])} +# we could try to make this script robust to errors from es or mysql, but since +# the only thing we can do is "clear state and retry", it's easier to leave +# this to the supervisor. If we we carrying around heavier state in-process, +# it'd be more worth it to handle errors ourselves. +# +# Apparently there's no setDefaultUncaughtExceptionHandler in threading, and +# sys.excepthook is also broken, so this gives us the same +# exit-if-anything-happens semantics. +class ExitingThread(Thread): + def run(self): + try: + self.run_happy() + except: + log.exception("something happened") + # sys.exit only exits the thread, lame + import os + os._exit(1) -class BinlogReader(Thread): +class BinlogReader(ExitingThread): # write_buf is the Queue we communicate with def __init__(self, write_buf): Thread.__init__(self) self.write_buf = write_buf - def run(self): + def run_happy(self): with open(SAVE_LOC) as f: pos = json.load(f) @@ -229,7 +250,7 @@ class BinlogReader(Thread): else: raise Exception(f"unknown table {s.table}") -class EsPoster(Thread): +class EsPoster(ExitingThread): # read_buf is the queue of stuff to bulk post def __init__(self, read_buf, chunk_size=1000, flush_interval=5): Thread.__init__(self) @@ -237,7 +258,7 @@ class EsPoster(Thread): self.chunk_size = chunk_size self.flush_interval = flush_interval - def run(self): + def run_happy(self): es = Elasticsearch(timeout=30) last_save = time.time()