diff --git a/sync_es.py b/sync_es.py index f52ca2b..444c905 100755 --- a/sync_es.py +++ b/sync_es.py @@ -263,54 +263,67 @@ class EsPoster(ExitingThread): last_save = time.time() since_last = 0 + # XXX keep track of last posted position for save points, awkward + posted_log_file = None + posted_log_pos = None while True: actions = [] - while len(actions) < self.chunk_size: + now = time.time() + # wait up to flush_interval seconds after starting the batch + deadline = now + self.flush_interval + while len(actions) < self.chunk_size and now < deadline: + timeout = deadline - now try: # grab next event from queue with metadata that creepily # updates, surviving outside the scope of the loop ((log_file, log_pos, timestamp), action) = \ - self.read_buf.get(block=True, timeout=self.flush_interval) + self.read_buf.get(block=True, timeout=timeout) actions.append(action) + now = time.time() except Empty: # nothing new for the whole interval break - if not actions: - # nothing to post - log.debug("no changes...") - continue + if actions: + # XXX "time" to get histogram of no events per bulk + stats.timing('actions_per_bulk', len(actions)) - # XXX "time" to get histogram of no events per bulk - stats.timing('actions_per_bulk', len(actions)) - - try: - with stats.timer('post_bulk'): - bulk(es, actions, chunk_size=self.chunk_size) - except BulkIndexError as bie: - # in certain cases where we're really out of sync, we update a - # stat when the torrent doc is, causing a "document missing" - # error from es, with no way to suppress that server-side. - # Thus ignore that type of error if it's the only problem - for e in bie.errors: - try: - if e['update']['error']['type'] != 'document_missing_exception': + try: + with stats.timer('post_bulk'): + bulk(es, actions, chunk_size=self.chunk_size) + except BulkIndexError as bie: + # in certain cases where we're really out of sync, we update a + # stat when the torrent doc is, causing a "document missing" + # error from es, with no way to suppress that server-side. + # Thus ignore that type of error if it's the only problem + for e in bie.errors: + try: + if e['update']['error']['type'] != 'document_missing_exception': + raise bie + except KeyError: raise bie - except KeyError: - raise bie - # how far we're behind, wall clock - stats.gauge('process_latency', int((time.time() - timestamp) * 1000)) + # how far we've gotten in the actual log + posted_log_file = log_file + posted_log_pos = log_pos + + # how far we're behind, wall clock + stats.gauge('process_latency', int((time.time() - timestamp) * 1000)) + else: + log.debug("no changes...") since_last += len(actions) - if since_last >= 10000 or (time.time() - last_save) > 10: + # TODO instead of this manual timeout loop, could move this to another queue/thread + if posted_log_file is not None and (since_last >= 10000 or (time.time() - last_save) > 10): log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind") with stats.timer('save_pos'): with open(SAVE_LOC, 'w') as f: - json.dump({"log_file": log_file, "log_pos": log_pos}, f) + json.dump({"log_file": posted_log_file, "log_pos": posted_log_pos}, f) last_save = time.time() since_last = 0 + posted_log_file = None + posted_log_pos = None # in-memory queue between binlog and es. The bigger it is, the more events we # can parse in memory while waiting for es to catch up, at the expense of heap.