|
|
|
@ -23,6 +23,7 @@ changes that happen while the import_to_es script is dumping stuff from the
|
|
|
|
|
database into es, at the expense of redoing a (small) amount of indexing.
|
|
|
|
|
"""
|
|
|
|
|
from elasticsearch import Elasticsearch
|
|
|
|
|
from elasticsearch.helpers import bulk
|
|
|
|
|
from pymysqlreplication import BinLogStreamReader
|
|
|
|
|
from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent
|
|
|
|
|
from datetime import datetime
|
|
|
|
@ -50,7 +51,7 @@ NT_DB = 'nyaav2'
|
|
|
|
|
with open(SAVE_LOC) as f:
|
|
|
|
|
pos = json.load(f)
|
|
|
|
|
|
|
|
|
|
es = Elasticsearch()
|
|
|
|
|
es = Elasticsearch(timeout=30)
|
|
|
|
|
|
|
|
|
|
stream = BinLogStreamReader(
|
|
|
|
|
# TODO parse out from config.py or something
|
|
|
|
@ -58,8 +59,7 @@ stream = BinLogStreamReader(
|
|
|
|
|
'host': MYSQL_HOST,
|
|
|
|
|
'port': MYSQL_PORT,
|
|
|
|
|
'user': MYSQL_USER,
|
|
|
|
|
'passwd': MYSQL_PW,
|
|
|
|
|
|
|
|
|
|
'passwd': MYSQL_PW
|
|
|
|
|
},
|
|
|
|
|
server_id=10, # arbitrary
|
|
|
|
|
# only care about this database currently
|
|
|
|
@ -108,60 +108,76 @@ def reindex_torrent(t, index_name):
|
|
|
|
|
"has_torrent": bool(t['has_torrent']),
|
|
|
|
|
}
|
|
|
|
|
# update, so we don't delete the stats if present
|
|
|
|
|
es.update(
|
|
|
|
|
index=index_name,
|
|
|
|
|
doc_type='torrent',
|
|
|
|
|
id=t['id'],
|
|
|
|
|
body={"doc": doc, "doc_as_upsert": True})
|
|
|
|
|
return {
|
|
|
|
|
'_op_type': 'update',
|
|
|
|
|
'_index': index_name,
|
|
|
|
|
'_type': 'torrent',
|
|
|
|
|
'_id': str(t['id']),
|
|
|
|
|
"doc": doc,
|
|
|
|
|
"doc_as_upsert": True
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def reindex_stats(s, index_name):
|
|
|
|
|
es.update(
|
|
|
|
|
index=index_name,
|
|
|
|
|
doc_type='torrent',
|
|
|
|
|
id=s['torrent_id'],
|
|
|
|
|
body={
|
|
|
|
|
"doc": {
|
|
|
|
|
"stats_last_updated": s["last_updated"],
|
|
|
|
|
"download_count": s["download_count"],
|
|
|
|
|
"leech_count": s['leech_count'],
|
|
|
|
|
"seed_count": s['seed_count'],
|
|
|
|
|
}, "doc_as_upsert": True})
|
|
|
|
|
# update the torrent at torrent_id, assumed to exist;
|
|
|
|
|
# this will always be the case if you're reading the binlog
|
|
|
|
|
# in order; the foreign key constraint on torrrent_id prevents
|
|
|
|
|
# the stats row rom existing if the torrent isn't around.
|
|
|
|
|
return {
|
|
|
|
|
'_op_type': 'update',
|
|
|
|
|
'_index': index_name,
|
|
|
|
|
'_type': 'torrent',
|
|
|
|
|
'_id': str(s['torrent_id']),
|
|
|
|
|
"doc": {
|
|
|
|
|
"stats_last_updated": s["last_updated"],
|
|
|
|
|
"download_count": s["download_count"],
|
|
|
|
|
"leech_count": s['leech_count'],
|
|
|
|
|
"seed_count": s['seed_count'],
|
|
|
|
|
}}
|
|
|
|
|
|
|
|
|
|
def delet_this(row, index_name):
|
|
|
|
|
return {
|
|
|
|
|
"_op_type": 'delete',
|
|
|
|
|
'_index': index_name,
|
|
|
|
|
'_type': 'torrent',
|
|
|
|
|
'_id': str(row['values']['id'])}
|
|
|
|
|
|
|
|
|
|
n = 0
|
|
|
|
|
last_save = time.time()
|
|
|
|
|
|
|
|
|
|
for event in stream:
|
|
|
|
|
for row in event.rows:
|
|
|
|
|
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
|
|
|
|
|
if event.table == "nyaa_torrents":
|
|
|
|
|
index_name = "nyaa"
|
|
|
|
|
else:
|
|
|
|
|
index_name = "sukebei"
|
|
|
|
|
if type(event) is WriteRowsEvent:
|
|
|
|
|
reindex_torrent(row['values'], index_name)
|
|
|
|
|
elif type(event) is UpdateRowsEvent:
|
|
|
|
|
reindex_torrent(row['after_values'], index_name)
|
|
|
|
|
elif type(event) is DeleteRowsEvent:
|
|
|
|
|
# just delete it
|
|
|
|
|
es.delete(index=index_name, doc_type='torrent', id=row['values']['id'])
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f"unknown event {type(event)}")
|
|
|
|
|
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
|
|
|
|
|
if event.table == "nyaa_torrents":
|
|
|
|
|
index_name = "nyaa"
|
|
|
|
|
else:
|
|
|
|
|
index_name = "sukebei"
|
|
|
|
|
if type(event) is WriteRowsEvent:
|
|
|
|
|
reindex_stats(row['values'], index_name)
|
|
|
|
|
elif type(event) is UpdateRowsEvent:
|
|
|
|
|
reindex_stats(row['after_values'], index_name)
|
|
|
|
|
elif type(event) is DeleteRowsEvent:
|
|
|
|
|
# uh ok. assume that the torrent row will get deleted later.
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f"unknown event {type(event)}")
|
|
|
|
|
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
|
|
|
|
|
if event.table == "nyaa_torrents":
|
|
|
|
|
index_name = "nyaa"
|
|
|
|
|
else:
|
|
|
|
|
index_name = "sukebei"
|
|
|
|
|
if type(event) is WriteRowsEvent:
|
|
|
|
|
bulk(es, (reindex_torrent(row['values'], index_name) for row in event.rows))
|
|
|
|
|
elif type(event) is UpdateRowsEvent:
|
|
|
|
|
# UpdateRowsEvent includes the old values too, but we don't care
|
|
|
|
|
bulk(es, (reindex_torrent(row['after_values'], index_name) for row in event.rows))
|
|
|
|
|
elif type(event) is DeleteRowsEvent:
|
|
|
|
|
# ok, bye
|
|
|
|
|
bulk(es, (delet_this(row, index_name) for row in event.rows))
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f"unknown table {s.table}")
|
|
|
|
|
raise Exception(f"unknown event {type(event)}")
|
|
|
|
|
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
|
|
|
|
|
if event.table == "nyaa_statistics":
|
|
|
|
|
index_name = "nyaa"
|
|
|
|
|
else:
|
|
|
|
|
index_name = "sukebei"
|
|
|
|
|
if type(event) is WriteRowsEvent:
|
|
|
|
|
bulk(es, (reindex_stats(row['values'], index_name) for row in event.rows))
|
|
|
|
|
elif type(event) is UpdateRowsEvent:
|
|
|
|
|
bulk(es, (reindex_stats(row['after_values'], index_name) for row in event.rows))
|
|
|
|
|
elif type(event) is DeleteRowsEvent:
|
|
|
|
|
# uh ok. assume that the torrent row will get deleted later,
|
|
|
|
|
# which will clean up the entire es "torrent" document
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f"unknown event {type(event)}")
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f"unknown table {s.table}")
|
|
|
|
|
|
|
|
|
|
n += 1
|
|
|
|
|
if n % 100 == 0 or time.time() - last_save > 30:
|
|
|
|
|
log.info(f"saving position {stream.log_file}/{stream.log_pos}")
|
|
|
|
|