From 17217d9427c33ad139dc40423119b186bcdf2a72 Mon Sep 17 00:00:00 2001 From: queue Date: Sun, 14 May 2017 00:48:17 -0600 Subject: [PATCH] WIP es stuff --- import_to_es.py | 90 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 import_to_es.py diff --git a/import_to_es.py b/import_to_es.py new file mode 100644 index 0000000..4be5e2b --- /dev/null +++ b/import_to_es.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +""" +Bulk load torents from mysql into elasticsearch `nyaav2` index, +which is assumed to already exist. +This is a one-shot deal, so you'd either need to complement it +with a cron job or some binlog-reading thing (TODO) +""" +from nyaa.models import Torrent +from elasticsearch import Elasticsearch +from elasticsearch import helpers +import progressbar +import sys + +bar = progressbar.ProgressBar( + max_value=Torrent.query.count(), + widgets=[ + progressbar.SimpleProgress(), + ' [', progressbar.Timer(), '] ', + progressbar.Bar(), + ' (', progressbar.ETA(), ') ', + ]) + +es = Elasticsearch() + +# turn into thing that elasticsearch indexes. We flatten in +# the stats (seeders/leechers) so we can order by them in es naturally. +# we _don't_ dereference uploader_id to the user's display name however, +# instead doing that at query time. I _think_ this is right because +# we don't want to reindex all the user's torrents just because they +# changed their name, and we don't really want to FTS search on the user anyway. +# Maybe it's more convenient to derefence though. +def mk_es(t): + return { + "_id": t.id, + "_type": "torrent", + "_index": "nyaav2", + "_source": { + "display_name": t.display_name, + "created_time": t.created_time, + "updated_time": t.updated_time, + "description": t.description, + # not analyzed but included so we can render magnet links + # without querying sql again. + "info_hash": t.info_hash.hex(), + "filesize": t.filesize, + "uploader_id": t.uploader_id, + "main_category_id": t.main_category_id, + "sub_category_id": t.sub_category_id, + # XXX all the bitflags are numbers + "anonymous": bool(t.anonymous), + "trusted": bool(t.trusted), + "remake": bool(t.remake), + "complete": bool(t.complete), + # TODO instead of indexing and filtering later + # could delete from es entirely. Probably won't matter + # for at least a few months. + "hidden": bool(t.hidden), + "deleted": bool(t.deleted), + "has_torrent": t.has_torrent, + # XXX last_updated isn't initialized + "stats_last_updated": t.stats.last_updated or t.created_time, + "download_count": t.stats.download_count, + "leech_count": t.stats.leech_count, + "seed_count": t.stats.seed_count, + } + } + +# page through an sqlalchemy query, like the per_fetch but +# doesn't break the eager joins its doing against the stats table. +# annoying that this isn't built in somehow. +def page_query(query, limit=sys.maxsize, batch_size=10000): + start = 0 + while True: + # XXX very inelegant way to do this, i'm confus + stop = min(limit, start + batch_size) + if stop == start: + break + things = query.slice(start, stop) + if not things: + break + had_things = False + for thing in things: + had_things = True + yield(thing) + if not had_things or stop == limit: + break + bar.update(start) + start = min(limit, start + batch_size) + +helpers.bulk(es, (mk_es(t) for t in page_query(Torrent.query)), chunk_size=10000)