diff --git a/sync_es.py b/sync_es.py index 9cfe892..c22fbcb 100644 --- a/sync_es.py +++ b/sync_es.py @@ -23,6 +23,7 @@ changes that happen while the import_to_es script is dumping stuff from the database into es, at the expense of redoing a (small) amount of indexing. """ from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent from datetime import datetime @@ -58,8 +59,7 @@ stream = BinLogStreamReader( 'host': MYSQL_HOST, 'port': MYSQL_PORT, 'user': MYSQL_USER, - 'passwd': MYSQL_PW, - + 'passwd': MYSQL_PW }, server_id=10, # arbitrary # only care about this database currently @@ -108,60 +108,76 @@ def reindex_torrent(t, index_name): "has_torrent": bool(t['has_torrent']), } # update, so we don't delete the stats if present - es.update( - index=index_name, - doc_type='torrent', - id=t['id'], - body={"doc": doc, "doc_as_upsert": True}) + return { + '_op_type': 'update', + '_index': index_name, + '_type': 'torrent', + '_id': str(t['id']), + "doc": doc, + "doc_as_upsert": True + } def reindex_stats(s, index_name): - es.update( - index=index_name, - doc_type='torrent', - id=s['torrent_id'], - body={ - "doc": { - "stats_last_updated": s["last_updated"], - "download_count": s["download_count"], - "leech_count": s['leech_count'], - "seed_count": s['seed_count'], - }, "doc_as_upsert": True}) + # update the torrent at torrent_id, assumed to exist; + # this will always be the case if you're reading the binlog + # in order; the foreign key constraint on torrrent_id prevents + # the stats row rom existing if the torrent isn't around. + return { + '_op_type': 'update', + '_index': index_name, + '_type': 'torrent', + '_id': str(s['torrent_id']), + "doc": { + "stats_last_updated": s["last_updated"], + "download_count": s["download_count"], + "leech_count": s['leech_count'], + "seed_count": s['seed_count'], + }} + +def delet_this(row, index_name): + return { + "_op_type": 'delete', + '_index': index_name, + '_type': 'torrent', + '_id': str(row['values']['id'])} n = 0 last_save = time.time() for event in stream: - for row in event.rows: - if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": - if event.table == "nyaa_torrents": - index_name = "nyaa" - else: - index_name = "sukebei" - if type(event) is WriteRowsEvent: - reindex_torrent(row['values'], index_name) - elif type(event) is UpdateRowsEvent: - reindex_torrent(row['after_values'], index_name) - elif type(event) is DeleteRowsEvent: - # just delete it - es.delete(index=index_name, doc_type='torrent', id=row['values']['id']) - else: - raise Exception(f"unknown event {type(event)}") - elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": - if event.table == "nyaa_torrents": - index_name = "nyaa" - else: - index_name = "sukebei" - if type(event) is WriteRowsEvent: - reindex_stats(row['values'], index_name) - elif type(event) is UpdateRowsEvent: - reindex_stats(row['after_values'], index_name) - elif type(event) is DeleteRowsEvent: - # uh ok. assume that the torrent row will get deleted later. - pass - else: - raise Exception(f"unknown event {type(event)}") + if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": + if event.table == "nyaa_torrents": + index_name = "nyaa" else: - raise Exception(f"unknown table {s.table}") + index_name = "sukebei" + if type(event) is WriteRowsEvent: + bulk(es, (reindex_torrent(row['values'], index_name) for row in event.rows)) + elif type(event) is UpdateRowsEvent: + # UpdateRowsEvent includes the old values too, but we don't care + bulk(es, (reindex_torrent(row['after_values'], index_name) for row in event.rows)) + elif type(event) is DeleteRowsEvent: + # ok, bye + bulk(es, (delet_this(row, index_name) for row in event.rows)) + else: + raise Exception(f"unknown event {type(event)}") + elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": + if event.table == "nyaa_statistics": + index_name = "nyaa" + else: + index_name = "sukebei" + if type(event) is WriteRowsEvent: + bulk(es, (reindex_stats(row['values'], index_name) for row in event.rows)) + elif type(event) is UpdateRowsEvent: + bulk(es, (reindex_stats(row['after_values'], index_name) for row in event.rows)) + elif type(event) is DeleteRowsEvent: + # uh ok. assume that the torrent row will get deleted later, + # which will clean up the entire es "torrent" document + pass + else: + raise Exception(f"unknown event {type(event)}") + else: + raise Exception(f"unknown table {s.table}") + n += 1 if n % 100 == 0 or time.time() - last_save > 30: log.info(f"saving position {stream.log_file}/{stream.log_pos}")