Merge pull request #151 from nyaadevs/syncfaster

sync_es: multithread so it can actually keep up
This commit is contained in:
Yukikaze 2017-05-21 00:03:07 -07:00 committed by GitHub
commit 3c0f5ee60a
2 changed files with 213 additions and 80 deletions

11
config_es_sync.json Normal file
View File

@ -0,0 +1,11 @@
{
"save_loc": "/tmp/pos.json",
"mysql_host": "127.0.0.1",
"mysql_port": 13306,
"mysql_user": "root",
"mysql_password": "dunnolol",
"database": "nyaav2",
"internal_queue_depth": 10000,
"es_chunk_size": 10000,
"flush_interval": 5
}

View File

@ -21,9 +21,12 @@ when import_to_es and sync_es will be lost. Instead, you can run SHOW MASTER
STATUS _before_ you run import_to_es. That way you'll definitely pick up any
changes that happen while the import_to_es script is dumping stuff from the
database into es, at the expense of redoing a (small) amount of indexing.
This uses multithreading so we don't have to block on socket io (both binlog
reading and es POSTing). asyncio soon
"""
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import bulk, BulkIndexError
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent
from datetime import datetime
@ -32,51 +35,39 @@ import sys
import json
import time
import logging
from statsd import StatsClient
from threading import Thread
from queue import Queue, Empty
logging.basicConfig()
logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s')
log = logging.getLogger('sync_es')
log.setLevel(logging.INFO)
# config in json, 2lazy to argparse
if len(sys.argv) != 2:
print("need config.json location", file=sys.stderr)
sys.exit(-1)
with open(sys.argv[1]) as f:
config = json.load(f)
# goes to netdata or other statsd listener
stats = StatsClient('localhost', 8125, prefix="sync_es")
#logging.getLogger('elasticsearch').setLevel(logging.DEBUG)
# in prod want in /var/lib somewhere probably
SAVE_LOC = "/var/lib/sync_es_position.json"
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'test'
MYSQL_PW = 'test123'
NT_DB = 'nyaav2'
with open(SAVE_LOC) as f:
pos = json.load(f)
es = Elasticsearch(timeout=30)
stream = BinLogStreamReader(
# TODO parse out from config.py or something
connection_settings = {
'host': MYSQL_HOST,
'port': MYSQL_PORT,
'user': MYSQL_USER,
'passwd': MYSQL_PW
},
server_id=10, # arbitrary
# only care about this database currently
only_schemas=[NT_DB],
# these tables in the database
only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"],
# from our save file
resume_stream=True,
log_file=pos['log_file'],
log_pos=pos['log_pos'],
# skip the other stuff like table mapping
only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent],
# if we're at the head of the log, block until something happens
# note it'd be nice to block async-style instead, but the mainline
# binlogreader is synchronous. there is an (unmaintained?) fork
# using aiomysql if anybody wants to revive that.
blocking=True)
SAVE_LOC = config.get('save_loc', "/tmp/pos.json")
MYSQL_HOST = config.get('mysql_host', '127.0.0.1')
MYSQL_PORT = config.get('mysql_port', 3306)
MYSQL_USER = config.get('mysql_user', 'root')
MYSQL_PW = config.get('mysql_password', 'dunnolol')
NT_DB = config.get('database', 'nyaav2')
INTERNAL_QUEUE_DEPTH = config.get('internal_queue_depth', 10000)
ES_CHUNK_SIZE = config.get('es_chunk_size', 10000)
# seconds since no events happening to flush to es. remember this also
# interacts with es' refresh_interval setting.
FLUSH_INTERVAL = config.get('flush_interval', 5)
def reindex_torrent(t, index_name):
# XXX annoyingly different from import_to_es, and
@ -120,8 +111,8 @@ def reindex_torrent(t, index_name):
def reindex_stats(s, index_name):
# update the torrent at torrent_id, assumed to exist;
# this will always be the case if you're reading the binlog
# in order; the foreign key constraint on torrrent_id prevents
# the stats row rom existing if the torrent isn't around.
# in order; the foreign key constraint on torrent_id prevents
# the stats row from existing if the torrent isn't around.
return {
'_op_type': 'update',
'_index': index_name,
@ -141,45 +132,176 @@ def delet_this(row, index_name):
'_type': 'torrent',
'_id': str(row['values']['id'])}
n = 0
last_save = time.time()
for event in stream:
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
if event.table == "nyaa_torrents":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
bulk(es, (reindex_torrent(row['values'], index_name) for row in event.rows))
elif type(event) is UpdateRowsEvent:
# UpdateRowsEvent includes the old values too, but we don't care
bulk(es, (reindex_torrent(row['after_values'], index_name) for row in event.rows))
elif type(event) is DeleteRowsEvent:
# ok, bye
bulk(es, (delet_this(row, index_name) for row in event.rows))
else:
raise Exception(f"unknown event {type(event)}")
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
if event.table == "nyaa_statistics":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
bulk(es, (reindex_stats(row['values'], index_name) for row in event.rows))
elif type(event) is UpdateRowsEvent:
bulk(es, (reindex_stats(row['after_values'], index_name) for row in event.rows))
elif type(event) is DeleteRowsEvent:
# uh ok. assume that the torrent row will get deleted later,
# which will clean up the entire es "torrent" document
pass
else:
raise Exception(f"unknown event {type(event)}")
else:
raise Exception(f"unknown table {s.table}")
class BinlogReader(Thread):
# write_buf is the Queue we communicate with
def __init__(self, write_buf):
Thread.__init__(self)
self.write_buf = write_buf
n += 1
if n % 100 == 0 or time.time() - last_save > 30:
log.info(f"saving position {stream.log_file}/{stream.log_pos}")
with open(SAVE_LOC, 'w') as f:
json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f)
def run(self):
with open(SAVE_LOC) as f:
pos = json.load(f)
stream = BinLogStreamReader(
# TODO parse out from config.py or something
connection_settings = {
'host': MYSQL_HOST,
'port': MYSQL_PORT,
'user': MYSQL_USER,
'passwd': MYSQL_PW
},
server_id=10, # arbitrary
# only care about this database currently
only_schemas=[NT_DB],
# these tables in the database
only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"],
# from our save file
resume_stream=True,
log_file=pos['log_file'],
log_pos=pos['log_pos'],
# skip the other stuff like table mapping
only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent],
# if we're at the head of the log, block until something happens
# note it'd be nice to block async-style instead, but the mainline
# binlogreader is synchronous. there is an (unmaintained?) fork
# using aiomysql if anybody wants to revive that.
blocking=True)
log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}")
for event in stream:
# save the pos of the stream and timestamp with each message, so we
# can commit in the other thread. and keep track of process latency
pos = (stream.log_file, stream.log_pos, event.timestamp)
with stats.pipeline() as s:
s.incr('total_events')
s.incr(f"event.{event.table}.{type(event).__name__}")
s.incr('total_rows', len(event.rows))
s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows))
# XXX not a "timer", but we get a histogram out of it
s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows))
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
if event.table == "nyaa_torrents":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
for row in event.rows:
self.write_buf.put(
(pos, reindex_torrent(row['values'], index_name)),
block=True)
elif type(event) is UpdateRowsEvent:
# UpdateRowsEvent includes the old values too, but we don't care
for row in event.rows:
self.write_buf.put(
(pos, reindex_torrent(row['after_values'], index_name)),
block=True)
elif type(event) is DeleteRowsEvent:
# ok, bye
for row in event.rows:
self.write_buf.put((pos, delet_this(row, index_name)), block=True)
else:
raise Exception(f"unknown event {type(event)}")
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
if event.table == "nyaa_statistics":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
for row in event.rows:
self.write_buf.put(
(pos, reindex_stats(row['values'], index_name)),
block=True)
elif type(event) is UpdateRowsEvent:
for row in event.rows:
self.write_buf.put(
(pos, reindex_stats(row['after_values'], index_name)),
block=True)
elif type(event) is DeleteRowsEvent:
# uh ok. Assume that the torrent row will get deleted later,
# which will clean up the entire es "torrent" document
pass
else:
raise Exception(f"unknown event {type(event)}")
else:
raise Exception(f"unknown table {s.table}")
class EsPoster(Thread):
# read_buf is the queue of stuff to bulk post
def __init__(self, read_buf, chunk_size=1000, flush_interval=5):
Thread.__init__(self)
self.read_buf = read_buf
self.chunk_size = chunk_size
self.flush_interval = flush_interval
def run(self):
es = Elasticsearch(timeout=30)
last_save = time.time()
since_last = 0
while True:
actions = []
while len(actions) < self.chunk_size:
try:
# grab next event from queue with metadata that creepily
# updates, surviving outside the scope of the loop
((log_file, log_pos, timestamp), action) = \
self.read_buf.get(block=True, timeout=self.flush_interval)
actions.append(action)
except Empty:
# nothing new for the whole interval
break
if not actions:
# nothing to post
log.debug("no changes...")
continue
# XXX "time" to get histogram of no events per bulk
stats.timing('actions_per_bulk', len(actions))
try:
with stats.timer('post_bulk'):
bulk(es, actions, chunk_size=self.chunk_size)
except BulkIndexError as bie:
# in certain cases where we're really out of sync, we update a
# stat when the torrent doc is, causing a "document missing"
# error from es, with no way to suppress that server-side.
# Thus ignore that type of error if it's the only problem
for e in bie.errors:
try:
if e['update']['error']['type'] != 'document_missing_exception':
raise bie
except KeyError:
raise bie
# how far we're behind, wall clock
stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
since_last += len(actions)
if since_last >= 10000 or (time.time() - last_save) > 10:
log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind")
with stats.timer('save_pos'):
with open(SAVE_LOC, 'w') as f:
json.dump({"log_file": log_file, "log_pos": log_pos}, f)
last_save = time.time()
since_last = 0
# in-memory queue between binlog and es. The bigger it is, the more events we
# can parse in memory while waiting for es to catch up, at the expense of heap.
buf = Queue(maxsize=INTERNAL_QUEUE_DEPTH)
reader = BinlogReader(buf)
reader.daemon = True
writer = EsPoster(buf, chunk_size=ES_CHUNK_SIZE, flush_interval=FLUSH_INTERVAL)
writer.daemon = True
reader.start()
writer.start()
# on the main thread, poll the queue size for monitoring
while True:
stats.gauge('queue_depth', buf.qsize())
time.sleep(1)