Merge pull request #208 from nyaadevs/die-sync-es

sync_es: die when killed
This commit is contained in:
Yukikaze 2017-05-29 03:48:10 -07:00 committed by GitHub
commit 3059507e09
1 changed files with 64 additions and 30 deletions

View File

@ -24,6 +24,10 @@ database into es, at the expense of redoing a (small) amount of indexing.
This uses multithreading so we don't have to block on socket io (both binlog
reading and es POSTing). asyncio soon
This script will exit on any sort of exception, so you'll want to use your
supervisor's restart functionality, e.g. Restart=failure in systemd, or
the poor man's `while true; do sync_es.py; sleep 1; done` in tmux.
"""
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError
@ -133,14 +137,31 @@ def delet_this(row, index_name):
'_type': 'torrent',
'_id': str(row['values']['id'])}
# we could try to make this script robust to errors from es or mysql, but since
# the only thing we can do is "clear state and retry", it's easier to leave
# this to the supervisor. If we we carrying around heavier state in-process,
# it'd be more worth it to handle errors ourselves.
#
# Apparently there's no setDefaultUncaughtExceptionHandler in threading, and
# sys.excepthook is also broken, so this gives us the same
# exit-if-anything-happens semantics.
class ExitingThread(Thread):
def run(self):
try:
self.run_happy()
except:
log.exception("something happened")
# sys.exit only exits the thread, lame
import os
os._exit(1)
class BinlogReader(Thread):
class BinlogReader(ExitingThread):
# write_buf is the Queue we communicate with
def __init__(self, write_buf):
Thread.__init__(self)
self.write_buf = write_buf
def run(self):
def run_happy(self):
with open(SAVE_LOC) as f:
pos = json.load(f)
@ -229,7 +250,7 @@ class BinlogReader(Thread):
else:
raise Exception(f"unknown table {s.table}")
class EsPoster(Thread):
class EsPoster(ExitingThread):
# read_buf is the queue of stuff to bulk post
def __init__(self, read_buf, chunk_size=1000, flush_interval=5):
Thread.__init__(self)
@ -237,59 +258,72 @@ class EsPoster(Thread):
self.chunk_size = chunk_size
self.flush_interval = flush_interval
def run(self):
def run_happy(self):
es = Elasticsearch(timeout=30)
last_save = time.time()
since_last = 0
# XXX keep track of last posted position for save points, awkward
posted_log_file = None
posted_log_pos = None
while True:
actions = []
while len(actions) < self.chunk_size:
now = time.time()
# wait up to flush_interval seconds after starting the batch
deadline = now + self.flush_interval
while len(actions) < self.chunk_size and now < deadline:
timeout = deadline - now
try:
# grab next event from queue with metadata that creepily
# updates, surviving outside the scope of the loop
((log_file, log_pos, timestamp), action) = \
self.read_buf.get(block=True, timeout=self.flush_interval)
self.read_buf.get(block=True, timeout=timeout)
actions.append(action)
now = time.time()
except Empty:
# nothing new for the whole interval
break
if not actions:
# nothing to post
log.debug("no changes...")
continue
if actions:
# XXX "time" to get histogram of no events per bulk
stats.timing('actions_per_bulk', len(actions))
# XXX "time" to get histogram of no events per bulk
stats.timing('actions_per_bulk', len(actions))
try:
with stats.timer('post_bulk'):
bulk(es, actions, chunk_size=self.chunk_size)
except BulkIndexError as bie:
# in certain cases where we're really out of sync, we update a
# stat when the torrent doc is, causing a "document missing"
# error from es, with no way to suppress that server-side.
# Thus ignore that type of error if it's the only problem
for e in bie.errors:
try:
if e['update']['error']['type'] != 'document_missing_exception':
try:
with stats.timer('post_bulk'):
bulk(es, actions, chunk_size=self.chunk_size)
except BulkIndexError as bie:
# in certain cases where we're really out of sync, we update a
# stat when the torrent doc is, causing a "document missing"
# error from es, with no way to suppress that server-side.
# Thus ignore that type of error if it's the only problem
for e in bie.errors:
try:
if e['update']['error']['type'] != 'document_missing_exception':
raise bie
except KeyError:
raise bie
except KeyError:
raise bie
# how far we're behind, wall clock
stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
# how far we've gotten in the actual log
posted_log_file = log_file
posted_log_pos = log_pos
# how far we're behind, wall clock
stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
else:
log.debug("no changes...")
since_last += len(actions)
if since_last >= 10000 or (time.time() - last_save) > 10:
# TODO instead of this manual timeout loop, could move this to another queue/thread
if posted_log_file is not None and (since_last >= 10000 or (time.time() - last_save) > 10):
log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind")
with stats.timer('save_pos'):
with open(SAVE_LOC, 'w') as f:
json.dump({"log_file": log_file, "log_pos": log_pos}, f)
json.dump({"log_file": posted_log_file, "log_pos": posted_log_pos}, f)
last_save = time.time()
since_last = 0
posted_log_file = None
posted_log_pos = None
# in-memory queue between binlog and es. The bigger it is, the more events we
# can parse in memory while waiting for es to catch up, at the expense of heap.