From 0cbed3870e9a65fd6a37d0df8c72d83858b4fb18 Mon Sep 17 00:00:00 2001 From: TheAMM Date: Sun, 28 May 2017 02:12:48 +0300 Subject: [PATCH] Update import_to_es.py to index both torrent flavors, rename sync_es config import_to_es.py will now also SHOW MASTER STATUS now Changed progressbar dependency --- config_es_sync.json | 11 ------ es_sync_config.example.json | 11 ++++++ import_to_es.py | 69 ++++++++++++++++++++++++++----------- requirements.txt | 2 +- 4 files changed, 60 insertions(+), 33 deletions(-) delete mode 100644 config_es_sync.json create mode 100644 es_sync_config.example.json diff --git a/config_es_sync.json b/config_es_sync.json deleted file mode 100644 index d2cb889..0000000 --- a/config_es_sync.json +++ /dev/null @@ -1,11 +0,0 @@ -{ -"save_loc": "/tmp/pos.json", -"mysql_host": "127.0.0.1", -"mysql_port": 13306, -"mysql_user": "root", -"mysql_password": "dunnolol", -"database": "nyaav2", -"internal_queue_depth": 10000, -"es_chunk_size": 10000, -"flush_interval": 5 -} diff --git a/es_sync_config.example.json b/es_sync_config.example.json new file mode 100644 index 0000000..b2dc524 --- /dev/null +++ b/es_sync_config.example.json @@ -0,0 +1,11 @@ +{ + "save_loc": "/tmp/pos.json", + "mysql_host": "127.0.0.1", + "mysql_port": 3306, + "mysql_user": "nyaa", + "mysql_password": "some_password", + "database": "nyaav2", + "internal_queue_depth": 10000, + "es_chunk_size": 10000, + "flush_interval": 5 +} diff --git a/import_to_es.py b/import_to_es.py index 6832781..b3bc3b1 100755 --- a/import_to_es.py +++ b/import_to_es.py @@ -5,22 +5,16 @@ which is assumed to already exist. This is a one-shot deal, so you'd either need to complement it with a cron job or some binlog-reading thing (TODO) """ -from nyaa import app -from nyaa.models import Torrent +import sys +import json +from nyaa import app, db, models + from elasticsearch import Elasticsearch from elasticsearch.client import IndicesClient from elasticsearch import helpers -import progressbar -import sys -bar = progressbar.ProgressBar( - max_value=Torrent.query.count(), - widgets=[ - progressbar.SimpleProgress(), - ' [', progressbar.Timer(), '] ', - progressbar.Bar(), - ' (', progressbar.ETA(), ') ', - ]) +# This should be progressbar33 +import progressbar es = Elasticsearch(timeout=30) ic = IndicesClient(es) @@ -32,11 +26,11 @@ ic = IndicesClient(es) # we don't want to reindex all the user's torrents just because they # changed their name, and we don't really want to FTS search on the user anyway. # Maybe it's more convenient to derefence though. -def mk_es(t): +def mk_es(t, index_name): return { "_id": t.id, "_type": "torrent", - "_index": app.config['ES_INDEX_NAME'], + "_index": index_name, "_source": { # we're also indexing the id as a number so you can # order by it. seems like this is just equivalent to @@ -73,7 +67,7 @@ def mk_es(t): # page through an sqlalchemy query, like the per_fetch but # doesn't break the eager joins its doing against the stats table. # annoying that this isn't built in somehow. -def page_query(query, limit=sys.maxsize, batch_size=10000): +def page_query(query, limit=sys.maxsize, batch_size=10000, progress_bar=None): start = 0 while True: # XXX very inelegant way to do this, i'm confus @@ -89,13 +83,46 @@ def page_query(query, limit=sys.maxsize, batch_size=10000): yield(thing) if not had_things or stop == limit: break - bar.update(start) + if progress_bar: + progress_bar.update(start) start = min(limit, start + batch_size) -# turn off refreshes while bulk loading -ic.put_settings(body={'index': {'refresh_interval': '-1'}}, index=app.config['ES_INDEX_NAME']) +FLAVORS = [ + ('nyaa', models.NyaaTorrent), + ('sukebei', models.SukebeiTorrent) +] -helpers.bulk(es, (mk_es(t) for t in page_query(Torrent.query)), chunk_size=10000) +# Get binlog status from mysql +master_status = db.engine.execute('SHOW MASTER STATUS;').fetchone() -# restore to near-enough real time -ic.put_settings(body={'index': {'refresh_interval': '30s'}}, index=app.config['ES_INDEX_NAME']) +position_json = { + 'log_file': master_status[0], + 'log_pos': master_status[1] +} + +print('Save the following in the file configured in your ES sync config JSON:') +print(json.dumps(position_json)) + +for flavor, torrent_class in FLAVORS: + print('Importing torrents for index', flavor, 'from', torrent_class) + bar = progressbar.ProgressBar( + maxval=torrent_class.query.count(), + widgets=[ progressbar.SimpleProgress(), + ' [', progressbar.Timer(), '] ', + progressbar.Bar(), + ' (', progressbar.ETA(), ') ', + ]) + + # turn off refreshes while bulk loading + ic.put_settings(body={'index': {'refresh_interval': '-1'}}, index=flavor) + + bar.start() + helpers.bulk(es, (mk_es(t, flavor) for t in page_query(torrent_class.query, progress_bar=bar)), chunk_size=10000) + bar.finish() + + # Refresh the index immideately + ic.refresh(index=flavor) + print('Index refresh done.') + + # restore to near-enough real time + ic.put_settings(body={'index': {'refresh_interval': '30s'}}, index=flavor) diff --git a/requirements.txt b/requirements.txt index 2a184ff..43e57e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,7 +28,7 @@ mysqlclient==1.3.10 orderedset==2.0 packaging==16.8 passlib==1.7.1 -progressbar2==3.20.0 +progressbar33==2.40.0 pycodestyle==2.3.1 pycparser==2.17 PyMySQL==0.7.11