Update import_to_es.py to index both torrent flavors, rename sync_es config

import_to_es.py will now also SHOW MASTER STATUS now
Changed progressbar dependency
This commit is contained in:
TheAMM 2017-05-28 02:12:48 +03:00
parent 79dae38f37
commit 0cbed3870e
4 changed files with 60 additions and 33 deletions

View File

@ -1,11 +0,0 @@
{
"save_loc": "/tmp/pos.json",
"mysql_host": "127.0.0.1",
"mysql_port": 13306,
"mysql_user": "root",
"mysql_password": "dunnolol",
"database": "nyaav2",
"internal_queue_depth": 10000,
"es_chunk_size": 10000,
"flush_interval": 5
}

View File

@ -0,0 +1,11 @@
{
"save_loc": "/tmp/pos.json",
"mysql_host": "127.0.0.1",
"mysql_port": 3306,
"mysql_user": "nyaa",
"mysql_password": "some_password",
"database": "nyaav2",
"internal_queue_depth": 10000,
"es_chunk_size": 10000,
"flush_interval": 5
}

View File

@ -5,22 +5,16 @@ which is assumed to already exist.
This is a one-shot deal, so you'd either need to complement it This is a one-shot deal, so you'd either need to complement it
with a cron job or some binlog-reading thing (TODO) with a cron job or some binlog-reading thing (TODO)
""" """
from nyaa import app import sys
from nyaa.models import Torrent import json
from nyaa import app, db, models
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient from elasticsearch.client import IndicesClient
from elasticsearch import helpers from elasticsearch import helpers
import progressbar
import sys
bar = progressbar.ProgressBar( # This should be progressbar33
max_value=Torrent.query.count(), import progressbar
widgets=[
progressbar.SimpleProgress(),
' [', progressbar.Timer(), '] ',
progressbar.Bar(),
' (', progressbar.ETA(), ') ',
])
es = Elasticsearch(timeout=30) es = Elasticsearch(timeout=30)
ic = IndicesClient(es) ic = IndicesClient(es)
@ -32,11 +26,11 @@ ic = IndicesClient(es)
# we don't want to reindex all the user's torrents just because they # we don't want to reindex all the user's torrents just because they
# changed their name, and we don't really want to FTS search on the user anyway. # changed their name, and we don't really want to FTS search on the user anyway.
# Maybe it's more convenient to derefence though. # Maybe it's more convenient to derefence though.
def mk_es(t): def mk_es(t, index_name):
return { return {
"_id": t.id, "_id": t.id,
"_type": "torrent", "_type": "torrent",
"_index": app.config['ES_INDEX_NAME'], "_index": index_name,
"_source": { "_source": {
# we're also indexing the id as a number so you can # we're also indexing the id as a number so you can
# order by it. seems like this is just equivalent to # order by it. seems like this is just equivalent to
@ -73,7 +67,7 @@ def mk_es(t):
# page through an sqlalchemy query, like the per_fetch but # page through an sqlalchemy query, like the per_fetch but
# doesn't break the eager joins its doing against the stats table. # doesn't break the eager joins its doing against the stats table.
# annoying that this isn't built in somehow. # annoying that this isn't built in somehow.
def page_query(query, limit=sys.maxsize, batch_size=10000): def page_query(query, limit=sys.maxsize, batch_size=10000, progress_bar=None):
start = 0 start = 0
while True: while True:
# XXX very inelegant way to do this, i'm confus # XXX very inelegant way to do this, i'm confus
@ -89,13 +83,46 @@ def page_query(query, limit=sys.maxsize, batch_size=10000):
yield(thing) yield(thing)
if not had_things or stop == limit: if not had_things or stop == limit:
break break
bar.update(start) if progress_bar:
progress_bar.update(start)
start = min(limit, start + batch_size) start = min(limit, start + batch_size)
# turn off refreshes while bulk loading FLAVORS = [
ic.put_settings(body={'index': {'refresh_interval': '-1'}}, index=app.config['ES_INDEX_NAME']) ('nyaa', models.NyaaTorrent),
('sukebei', models.SukebeiTorrent)
]
helpers.bulk(es, (mk_es(t) for t in page_query(Torrent.query)), chunk_size=10000) # Get binlog status from mysql
master_status = db.engine.execute('SHOW MASTER STATUS;').fetchone()
# restore to near-enough real time position_json = {
ic.put_settings(body={'index': {'refresh_interval': '30s'}}, index=app.config['ES_INDEX_NAME']) 'log_file': master_status[0],
'log_pos': master_status[1]
}
print('Save the following in the file configured in your ES sync config JSON:')
print(json.dumps(position_json))
for flavor, torrent_class in FLAVORS:
print('Importing torrents for index', flavor, 'from', torrent_class)
bar = progressbar.ProgressBar(
maxval=torrent_class.query.count(),
widgets=[ progressbar.SimpleProgress(),
' [', progressbar.Timer(), '] ',
progressbar.Bar(),
' (', progressbar.ETA(), ') ',
])
# turn off refreshes while bulk loading
ic.put_settings(body={'index': {'refresh_interval': '-1'}}, index=flavor)
bar.start()
helpers.bulk(es, (mk_es(t, flavor) for t in page_query(torrent_class.query, progress_bar=bar)), chunk_size=10000)
bar.finish()
# Refresh the index immideately
ic.refresh(index=flavor)
print('Index refresh done.')
# restore to near-enough real time
ic.put_settings(body={'index': {'refresh_interval': '30s'}}, index=flavor)

View File

@ -28,7 +28,7 @@ mysqlclient==1.3.10
orderedset==2.0 orderedset==2.0
packaging==16.8 packaging==16.8
passlib==1.7.1 passlib==1.7.1
progressbar2==3.20.0 progressbar33==2.40.0
pycodestyle==2.3.1 pycodestyle==2.3.1
pycparser==2.17 pycparser==2.17
PyMySQL==0.7.11 PyMySQL==0.7.11