mirror of
https://gitlab.com/SIGBUS/nyaa.git
synced 2024-12-22 09:30:01 +00:00
sync_es: fix flush_interval behavior during slow times
instead of flushing every N seconds, it flushed N seconds after the last change, which could drag out to N seconds * M batch size if there are few updates. Practically this doesn't change anything since stuff is always happening. Also fix not writing a save point if nothing is happening. Also practically does nothing, but for correctness.
This commit is contained in:
parent
87db2e9bae
commit
044cb143bf
65
sync_es.py
65
sync_es.py
|
@ -262,54 +262,67 @@ class EsPoster(ExitingThread):
|
||||||
|
|
||||||
last_save = time.time()
|
last_save = time.time()
|
||||||
since_last = 0
|
since_last = 0
|
||||||
|
# XXX keep track of last posted position for save points, awkward
|
||||||
|
posted_log_file = None
|
||||||
|
posted_log_pos = None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
actions = []
|
actions = []
|
||||||
while len(actions) < self.chunk_size:
|
now = time.time()
|
||||||
|
# wait up to flush_interval seconds after starting the batch
|
||||||
|
deadline = now + self.flush_interval
|
||||||
|
while len(actions) < self.chunk_size and now < deadline:
|
||||||
|
timeout = deadline - now
|
||||||
try:
|
try:
|
||||||
# grab next event from queue with metadata that creepily
|
# grab next event from queue with metadata that creepily
|
||||||
# updates, surviving outside the scope of the loop
|
# updates, surviving outside the scope of the loop
|
||||||
((log_file, log_pos, timestamp), action) = \
|
((log_file, log_pos, timestamp), action) = \
|
||||||
self.read_buf.get(block=True, timeout=self.flush_interval)
|
self.read_buf.get(block=True, timeout=timeout)
|
||||||
actions.append(action)
|
actions.append(action)
|
||||||
|
now = time.time()
|
||||||
except Empty:
|
except Empty:
|
||||||
# nothing new for the whole interval
|
# nothing new for the whole interval
|
||||||
break
|
break
|
||||||
|
|
||||||
if not actions:
|
if actions:
|
||||||
# nothing to post
|
# XXX "time" to get histogram of no events per bulk
|
||||||
log.debug("no changes...")
|
stats.timing('actions_per_bulk', len(actions))
|
||||||
continue
|
|
||||||
|
|
||||||
# XXX "time" to get histogram of no events per bulk
|
try:
|
||||||
stats.timing('actions_per_bulk', len(actions))
|
with stats.timer('post_bulk'):
|
||||||
|
bulk(es, actions, chunk_size=self.chunk_size)
|
||||||
try:
|
except BulkIndexError as bie:
|
||||||
with stats.timer('post_bulk'):
|
# in certain cases where we're really out of sync, we update a
|
||||||
bulk(es, actions, chunk_size=self.chunk_size)
|
# stat when the torrent doc is, causing a "document missing"
|
||||||
except BulkIndexError as bie:
|
# error from es, with no way to suppress that server-side.
|
||||||
# in certain cases where we're really out of sync, we update a
|
# Thus ignore that type of error if it's the only problem
|
||||||
# stat when the torrent doc is, causing a "document missing"
|
for e in bie.errors:
|
||||||
# error from es, with no way to suppress that server-side.
|
try:
|
||||||
# Thus ignore that type of error if it's the only problem
|
if e['update']['error']['type'] != 'document_missing_exception':
|
||||||
for e in bie.errors:
|
raise bie
|
||||||
try:
|
except KeyError:
|
||||||
if e['update']['error']['type'] != 'document_missing_exception':
|
|
||||||
raise bie
|
raise bie
|
||||||
except KeyError:
|
|
||||||
raise bie
|
|
||||||
|
|
||||||
# how far we're behind, wall clock
|
# how far we've gotten in the actual log
|
||||||
stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
|
posted_log_file = log_file
|
||||||
|
posted_log_pos = log_pos
|
||||||
|
|
||||||
|
# how far we're behind, wall clock
|
||||||
|
stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
|
||||||
|
else:
|
||||||
|
log.debug("no changes...")
|
||||||
|
|
||||||
since_last += len(actions)
|
since_last += len(actions)
|
||||||
if since_last >= 10000 or (time.time() - last_save) > 10:
|
# TODO instead of this manual timeout loop, could move this to another queue/thread
|
||||||
|
if posted_log_file is not None and (since_last >= 10000 or (time.time() - last_save) > 10):
|
||||||
log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind")
|
log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind")
|
||||||
with stats.timer('save_pos'):
|
with stats.timer('save_pos'):
|
||||||
with open(SAVE_LOC, 'w') as f:
|
with open(SAVE_LOC, 'w') as f:
|
||||||
json.dump({"log_file": log_file, "log_pos": log_pos}, f)
|
json.dump({"log_file": posted_log_file, "log_pos": posted_log_pos}, f)
|
||||||
last_save = time.time()
|
last_save = time.time()
|
||||||
since_last = 0
|
since_last = 0
|
||||||
|
posted_log_file = None
|
||||||
|
posted_log_pos = None
|
||||||
|
|
||||||
# in-memory queue between binlog and es. The bigger it is, the more events we
|
# in-memory queue between binlog and es. The bigger it is, the more events we
|
||||||
# can parse in memory while waiting for es to catch up, at the expense of heap.
|
# can parse in memory while waiting for es to catch up, at the expense of heap.
|
||||||
|
|
Loading…
Reference in a new issue