sync_es.py: bulk actions per binlog event

mainly helps with the stat updates, that come in
a single INSERT VALUES (...) ON CONFLICT UPDATE event,
which helpfully translates to a bulk index event.

It seems like elasticsearch should still be buffering that up
internally, so maybe the refresh_interval: 30s change will help
more than this.
This commit is contained in:
queue 2017-05-16 22:47:34 -06:00
parent e530e28bbd
commit e38fe2575a
1 changed files with 64 additions and 48 deletions

View File

@ -23,6 +23,7 @@ changes that happen while the import_to_es script is dumping stuff from the
database into es, at the expense of redoing a (small) amount of indexing. database into es, at the expense of redoing a (small) amount of indexing.
""" """
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pymysqlreplication import BinLogStreamReader from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent
from datetime import datetime from datetime import datetime
@ -58,8 +59,7 @@ stream = BinLogStreamReader(
'host': MYSQL_HOST, 'host': MYSQL_HOST,
'port': MYSQL_PORT, 'port': MYSQL_PORT,
'user': MYSQL_USER, 'user': MYSQL_USER,
'passwd': MYSQL_PW, 'passwd': MYSQL_PW
}, },
server_id=10, # arbitrary server_id=10, # arbitrary
# only care about this database currently # only care about this database currently
@ -108,60 +108,76 @@ def reindex_torrent(t, index_name):
"has_torrent": bool(t['has_torrent']), "has_torrent": bool(t['has_torrent']),
} }
# update, so we don't delete the stats if present # update, so we don't delete the stats if present
es.update( return {
index=index_name, '_op_type': 'update',
doc_type='torrent', '_index': index_name,
id=t['id'], '_type': 'torrent',
body={"doc": doc, "doc_as_upsert": True}) '_id': str(t['id']),
"doc": doc,
"doc_as_upsert": True
}
def reindex_stats(s, index_name): def reindex_stats(s, index_name):
es.update( # update the torrent at torrent_id, assumed to exist;
index=index_name, # this will always be the case if you're reading the binlog
doc_type='torrent', # in order; the foreign key constraint on torrrent_id prevents
id=s['torrent_id'], # the stats row rom existing if the torrent isn't around.
body={ return {
"doc": { '_op_type': 'update',
"stats_last_updated": s["last_updated"], '_index': index_name,
"download_count": s["download_count"], '_type': 'torrent',
"leech_count": s['leech_count'], '_id': str(s['torrent_id']),
"seed_count": s['seed_count'], "doc": {
}, "doc_as_upsert": True}) "stats_last_updated": s["last_updated"],
"download_count": s["download_count"],
"leech_count": s['leech_count'],
"seed_count": s['seed_count'],
}}
def delet_this(row, index_name):
return {
"_op_type": 'delete',
'_index': index_name,
'_type': 'torrent',
'_id': str(row['values']['id'])}
n = 0 n = 0
last_save = time.time() last_save = time.time()
for event in stream: for event in stream:
for row in event.rows: if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": if event.table == "nyaa_torrents":
if event.table == "nyaa_torrents": index_name = "nyaa"
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
reindex_torrent(row['values'], index_name)
elif type(event) is UpdateRowsEvent:
reindex_torrent(row['after_values'], index_name)
elif type(event) is DeleteRowsEvent:
# just delete it
es.delete(index=index_name, doc_type='torrent', id=row['values']['id'])
else:
raise Exception(f"unknown event {type(event)}")
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
if event.table == "nyaa_torrents":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
reindex_stats(row['values'], index_name)
elif type(event) is UpdateRowsEvent:
reindex_stats(row['after_values'], index_name)
elif type(event) is DeleteRowsEvent:
# uh ok. assume that the torrent row will get deleted later.
pass
else:
raise Exception(f"unknown event {type(event)}")
else: else:
raise Exception(f"unknown table {s.table}") index_name = "sukebei"
if type(event) is WriteRowsEvent:
bulk(es, (reindex_torrent(row['values'], index_name) for row in event.rows))
elif type(event) is UpdateRowsEvent:
# UpdateRowsEvent includes the old values too, but we don't care
bulk(es, (reindex_torrent(row['after_values'], index_name) for row in event.rows))
elif type(event) is DeleteRowsEvent:
# ok, bye
bulk(es, (delet_this(row, index_name) for row in event.rows))
else:
raise Exception(f"unknown event {type(event)}")
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
if event.table == "nyaa_statistics":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
bulk(es, (reindex_stats(row['values'], index_name) for row in event.rows))
elif type(event) is UpdateRowsEvent:
bulk(es, (reindex_stats(row['after_values'], index_name) for row in event.rows))
elif type(event) is DeleteRowsEvent:
# uh ok. assume that the torrent row will get deleted later,
# which will clean up the entire es "torrent" document
pass
else:
raise Exception(f"unknown event {type(event)}")
else:
raise Exception(f"unknown table {s.table}")
n += 1 n += 1
if n % 100 == 0 or time.time() - last_save > 30: if n % 100 == 0 or time.time() - last_save > 30:
log.info(f"saving position {stream.log_file}/{stream.log_pos}") log.info(f"saving position {stream.log_file}/{stream.log_pos}")