diff --git a/README.md b/README.md index 5ec0077..632ce73 100644 --- a/README.md +++ b/README.md @@ -44,5 +44,43 @@ - Start the dev server with `python run.py` - Deactivate `source deactivate` +# Enabling ElasticSearch + +## Basics +- Install jdk `sudo apt-get install openjdk-8-jdk` +- Install elasticsearch https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html +- `sudo systemctl enable elasticsearch.service` +- `sudo systemctl start elasticsearch.service` +- Run `curl -XGET 'localhost:9200'` and make sure ES is running +- Optional: install Kabana as a search frontend for ES + +## Enable MySQL Binlogging +- Add the `[mariadb]` bin-log section to my.cnf and reload mysql server +- Connect to mysql +- `SHOW VARIABLES LIKE 'binlog_format';` + - Make sure it shows ROW +- Connect to root user +- `GRANT REPLICATION SLAVE ON *.* TO 'test'@'localhost';` where test is the user you will be running `sync_es.py` with + +## Setting up ES +- Run `./create_es.sh` and this creates two indicies: `nyaa` and `sukebei` +- The output should show `akncolwedged: true` twice +- The safest bet is to disable the webapp here to ensure there's no database writes +- Run `python import_to_es.py` with `SITE_FLAVOR` set to `nyaa` +- Run `python import_to_es.py` with `SITE_FLAVOR` set to `sukebei` +- These will take some time to run as it's indexing + +## Setting up sync_es.py +- Sync_es.py keeps the ElasticSearch index updated by reading the BinLog +- Configure the MySQL options with the user where you granted the REPLICATION permissions +- Connect to MySQL, run `SHOW MASTER STATUS;`. +- Copy the output to `/var/lib/sync_es_position.json` with the contents `{"log_file": "FILE", "log_pos": POSITION}` and replace FILENAME with File (something like master1-bin.000002) in the SQL output and POSITION (something like 892528513) with Position +- Set up `sync_es.py` as a service and run it, preferably as the system/root +- Make sure `sync_es.py` runs within venv with the right dependencies + +## Good to go! +- After that, enable the `USE_ELASTIC_SEARCH` flag and restart the webapp and you're good to go + + ## Code Quality: - Remember to follow PEP8 style guidelines and run `./lint.sh` before committing. diff --git a/config.example.py b/config.example.py index f34c554..73702b9 100644 --- a/config.example.py +++ b/config.example.py @@ -33,8 +33,6 @@ MAIL_FROM_ADDRESS = '***' SMTP_USERNAME = '***' SMTP_PASSWORD = '***' -RESULTS_PER_PAGE = 75 - # What the site identifies itself as. SITE_NAME = 'Nyaa' @@ -49,3 +47,14 @@ ENFORCE_MAIN_ANNOUNCE_URL = False MAIN_ANNOUNCE_URL = '' BACKUP_TORRENT_FOLDER = 'torrents' + +# +# Search Options +# +# Max ES search results, do not set over 10000 +RESULTS_PER_PAGE = 75 + +USE_ELASTIC_SEARCH = False +ENABLE_ELASTIC_SEARCH_HIGHLIGHT = False +ES_MAX_SEARCH_RESULT = 1000 +ES_INDEX_NAME = SITE_FLAVOR # we create indicies named nyaa or sukebei \ No newline at end of file diff --git a/create_es.sh b/create_es.sh new file mode 100755 index 0000000..5b0c564 --- /dev/null +++ b/create_es.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +# create indicies named "nyaa" and "sukebei", these are hardcoded +curl -v -XPUT 'localhost:9200/nyaa?pretty' -H"Content-Type: application/yaml" --data-binary @es_mapping.yml +curl -v -XPUT 'localhost:9200/sukebei?pretty' -H"Content-Type: application/yaml" --data-binary @es_mapping.yml diff --git a/es_mapping.yml b/es_mapping.yml new file mode 100644 index 0000000..9085ec2 --- /dev/null +++ b/es_mapping.yml @@ -0,0 +1,91 @@ +--- +# CREATE DTABASE/TABLE equivalent for elasticsearch, in yaml +# fo inline comments. +settings: + analysis: + analyzer: + my_search_analyzer: + type: custom + tokenizer: standard + char_filter: + - my_char_filter + filter: + - standard + - lowercase + my_index_analyzer: + type: custom + tokenizer: standard + char_filter: + - my_char_filter + filter: + - lowercase + - my_ngram + filter: + my_ngram: + type: edgeNGram + min_gram: 1 + max_gram: 15 + char_filter: + my_char_filter: + type: mapping + mappings: ["-=>_", "!=>_"] + index: + # we're running a single es node, so no sharding necessary, + # plus replicas don't really help either. + number_of_shards: 1 + number_of_replicas : 0 + mapper: + # disable elasticsearch's "helpful" autoschema + dynamic: false + # since we disabled the _all field, default query the + # name of the torrent. + query: + default_field: display_name +mappings: + torrent: + # don't want everything concatenated + _all: + enabled: false + properties: + id: + type: long + display_name: + # TODO could do a fancier tokenizer here to parse out the + # the scene convention of stuff in brackets, plus stuff like k-on + type: text + analyzer: my_index_analyzer + fielddata: true + created_time: + type: date + # Only in the ES index for generating magnet links + info_hash: + enabled: false + filesize: + type: long + anonymous: + type: boolean + trusted: + type: boolean + remake: + type: boolean + complete: + type: boolean + hidden: + type: boolean + deleted: + type: boolean + has_torrent: + type: boolean + download_count: + type: long + leech_count: + type: long + seed_count: + type: long + # these ids are really only for filtering, thus keyword + uploader_id: + type: keyword + main_category_id: + type: keyword + sub_category_id: + type: keyword \ No newline at end of file diff --git a/import_to_es.py b/import_to_es.py new file mode 100644 index 0000000..886211f --- /dev/null +++ b/import_to_es.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +""" +Bulk load torents from mysql into elasticsearch `nyaav2` index, +which is assumed to already exist. +This is a one-shot deal, so you'd either need to complement it +with a cron job or some binlog-reading thing (TODO) +""" +from nyaa import app +from nyaa.models import Torrent +from elasticsearch import Elasticsearch +from elasticsearch import helpers +import progressbar +import sys + +bar = progressbar.ProgressBar( + max_value=Torrent.query.count(), + widgets=[ + progressbar.SimpleProgress(), + ' [', progressbar.Timer(), '] ', + progressbar.Bar(), + ' (', progressbar.ETA(), ') ', + ]) + +es = Elasticsearch() + +# turn into thing that elasticsearch indexes. We flatten in +# the stats (seeders/leechers) so we can order by them in es naturally. +# we _don't_ dereference uploader_id to the user's display name however, +# instead doing that at query time. I _think_ this is right because +# we don't want to reindex all the user's torrents just because they +# changed their name, and we don't really want to FTS search on the user anyway. +# Maybe it's more convenient to derefence though. +def mk_es(t): + return { + "_id": t.id, + "_type": "torrent", + "_index": app.config['ES_INDEX_NAME'], + "_source": { + # we're also indexing the id as a number so you can + # order by it. seems like this is just equivalent to + # order by created_time, but oh well + "id": t.id, + "display_name": t.display_name, + "created_time": t.created_time, + # not analyzed but included so we can render magnet links + # without querying sql again. + "info_hash": t.info_hash.hex(), + "filesize": t.filesize, + "uploader_id": t.uploader_id, + "main_category_id": t.main_category_id, + "sub_category_id": t.sub_category_id, + # XXX all the bitflags are numbers + "anonymous": bool(t.anonymous), + "trusted": bool(t.trusted), + "remake": bool(t.remake), + "complete": bool(t.complete), + # TODO instead of indexing and filtering later + # could delete from es entirely. Probably won't matter + # for at least a few months. + "hidden": bool(t.hidden), + "deleted": bool(t.deleted), + "has_torrent": t.has_torrent, + # Stats + "download_count": t.stats.download_count, + "leech_count": t.stats.leech_count, + "seed_count": t.stats.seed_count, + } + } + +# page through an sqlalchemy query, like the per_fetch but +# doesn't break the eager joins its doing against the stats table. +# annoying that this isn't built in somehow. +def page_query(query, limit=sys.maxsize, batch_size=10000): + start = 0 + while True: + # XXX very inelegant way to do this, i'm confus + stop = min(limit, start + batch_size) + if stop == start: + break + things = query.slice(start, stop) + if not things: + break + had_things = False + for thing in things: + had_things = True + yield(thing) + if not had_things or stop == limit: + break + bar.update(start) + start = min(limit, start + batch_size) + +helpers.bulk(es, (mk_es(t) for t in page_query(Torrent.query)), chunk_size=10000) diff --git a/my.cnf b/my.cnf index 657a8f6..d586484 100644 --- a/my.cnf +++ b/my.cnf @@ -4,3 +4,9 @@ ft_min_word_len=2 innodb_ft_cache_size = 80000000 innodb_ft_total_cache_size = 1600000000 max_allowed_packet = 100M + +[mariadb] +log-bin +server_id=1 +log-basename=master1 +binlog-format = row diff --git a/nyaa/routes.py b/nyaa/routes.py index 51a7dd3..48c8428 100644 --- a/nyaa/routes.py +++ b/nyaa/routes.py @@ -6,18 +6,16 @@ from nyaa import bencode, utils from nyaa import torrents from nyaa import backend from nyaa import api_handler +from nyaa.search import search_elastic, search_db import config import json -import re from datetime import datetime, timedelta import ipaddress import os.path import base64 from urllib.parse import quote -import sqlalchemy_fulltext.modes as FullTextMode -from sqlalchemy_fulltext import FullTextSearch -import shlex +import math from werkzeug import url_encode from itsdangerous import URLSafeSerializer, BadSignature @@ -27,7 +25,14 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate +from flask_paginate import Pagination + + DEBUG_API = False +DEFAULT_MAX_SEARCH_RESULT = 1000 +DEFAULT_PER_PAGE = 75 +SERACH_PAGINATE_DISPLAY_MSG = '''Displaying results {start}-{end} out of {total} results.
+ Please refine your search results if you can't find what you were looking for.''' def redirect_url(): @@ -48,144 +53,13 @@ def modify_query(**new_values): return '{}?{}'.format(flask.request.path, url_encode(args)) + @app.template_global() def filter_truthy(input_list): ''' Jinja2 can't into list comprehension so this is for the search_results.html template ''' return [item for item in input_list if item] -def search(term='', user=None, sort='id', order='desc', category='0_0', quality_filter='0', page=1, rss=False, admin=False): - sort_keys = { - 'id': models.Torrent.id, - 'size': models.Torrent.filesize, - 'name': models.Torrent.display_name, - 'seeders': models.Statistic.seed_count, - 'leechers': models.Statistic.leech_count, - 'downloads': models.Statistic.download_count - } - - sort_ = sort.lower() - if sort_ not in sort_keys: - flask.abort(400) - sort = sort_keys[sort] - - order_keys = { - 'desc': 'desc', - 'asc': 'asc' - } - - order_ = order.lower() - if order_ not in order_keys: - flask.abort(400) - - filter_keys = { - '0': None, - '1': (models.TorrentFlags.REMAKE, False), - '2': (models.TorrentFlags.TRUSTED, True), - '3': (models.TorrentFlags.COMPLETE, True) - } - - sentinel = object() - filter_tuple = filter_keys.get(quality_filter.lower(), sentinel) - if filter_tuple is sentinel: - flask.abort(400) - - if user: - user = models.User.by_id(user) - if not user: - flask.abort(404) - user = user.id - - main_category = None - sub_category = None - main_cat_id = 0 - sub_cat_id = 0 - if category: - cat_match = re.match(r'^(\d+)_(\d+)$', category) - if not cat_match: - flask.abort(400) - - main_cat_id = int(cat_match.group(1)) - sub_cat_id = int(cat_match.group(2)) - - if main_cat_id > 0: - if sub_cat_id > 0: - sub_category = models.SubCategory.by_category_ids(main_cat_id, sub_cat_id) - else: - main_category = models.MainCategory.by_id(main_cat_id) - - if not category: - flask.abort(400) - - # Force sort by id desc if rss - if rss: - sort = sort_keys['id'] - order = 'desc' - - same_user = False - if flask.g.user: - same_user = flask.g.user.id == user - - if term: - query = db.session.query(models.TorrentNameSearch) - else: - query = models.Torrent.query - - # User view (/user/username) - if user: - query = query.filter(models.Torrent.uploader_id == user) - - if not admin: - # Hide all DELETED torrents if regular user - query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.DELETED)).is_(False)) - # If logged in user is not the same as the user being viewed, show only torrents that aren't hidden or anonymous - # If logged in user is the same as the user being viewed, show all torrents including hidden and anonymous ones - # On RSS pages in user view, show only torrents that aren't hidden or anonymous no matter what - if not same_user or rss: - query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.HIDDEN | - models.TorrentFlags.ANONYMOUS)).is_(False)) - # General view (homepage, general search view) - else: - if not admin: - # Hide all DELETED torrents if regular user - query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.DELETED)).is_(False)) - # If logged in, show all torrents that aren't hidden unless they belong to you - # On RSS pages, show all public torrents and nothing more. - if flask.g.user and not rss: - query = query.filter((models.Torrent.flags.op('&')(int(models.TorrentFlags.HIDDEN)).is_(False)) | - (models.Torrent.uploader_id == flask.g.user.id)) - # Otherwise, show all torrents that aren't hidden - else: - query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.HIDDEN)).is_(False)) - - if main_category: - query = query.filter(models.Torrent.main_category_id == main_cat_id) - elif sub_category: - query = query.filter((models.Torrent.main_category_id == main_cat_id) & - (models.Torrent.sub_category_id == sub_cat_id)) - - if filter_tuple: - query = query.filter(models.Torrent.flags.op('&')(int(filter_tuple[0])).is_(filter_tuple[1])) - - if term: - for item in shlex.split(term, posix=False): - if len(item) >= 2: - query = query.filter(FullTextSearch( - item, models.TorrentNameSearch, FullTextMode.NATURAL)) - - # Sort and order - if sort.class_ != models.Torrent: - query = query.join(sort.class_) - - query = query.order_by(getattr(sort, order)()) - - if rss: - query = query.limit(app.config['RESULTS_PER_PAGE']) - else: - query = query.paginate_faste(page, per_page=app.config['RESULTS_PER_PAGE'], step=5) - - return query - @app.errorhandler(404) def not_found(error): @@ -203,7 +77,6 @@ def before_request(): flask.g.user = user if not 'timeout' in flask.session or flask.session['timeout'] < datetime.now(): - print("hio") flask.session['timeout'] = datetime.now() + timedelta(days=7) flask.session.permanent = True flask.session.modified = True @@ -225,6 +98,18 @@ def _generate_query_string(term, category, filter, user): return params +@app.template_filter('utc_time') +def get_utc_timestamp(datetime_str): + ''' Returns a UTC POSIX timestamp, as seconds ''' + UTC_EPOCH = datetime.utcfromtimestamp(0) + return int((datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S') - UTC_EPOCH).total_seconds()) + + +@app.template_filter('display_time') +def get_display_time(datetime_str): + return datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S').strftime('%Y-%m-%d %H:%M') + + @app.route('/rss', defaults={'rss': True}) @app.route('/', defaults={'rss': False}) def home(rss): @@ -241,6 +126,10 @@ def home(rss): if page: page = int(page) + per_page = app.config.get('RESULTS_PER_PAGE') + if not per_page: + per_page = DEFAULT_PER_PAGE + user_id = None if user_name: user = models.User.by_username(user_name) @@ -249,30 +138,72 @@ def home(rss): user_id = user.id query_args = { - 'term': term or '', 'user': user_id, 'sort': sort or 'id', 'order': order or 'desc', 'category': category or '0_0', 'quality_filter': quality_filter or '0', 'page': page or 1, - 'rss': rss + 'rss': rss, + 'per_page': per_page } - # God mode - if flask.g.user and flask.g.user.is_admin: - query_args['admin'] = True + if flask.g.user: + query_args['logged_in_user'] = flask.g.user + if flask.g.user.is_admin: # God mode + query_args['admin'] = True - query = search(**query_args) + # If searching, we get results from elastic search + use_elastic = app.config.get('USE_ELASTIC_SEARCH') + if use_elastic and term: + query_args['term'] = term - if rss: - return render_rss('/', query) + max_search_results = app.config.get('ES_MAX_SEARCH_RESULT') + if not max_search_results: + max_search_results = DEFAULT_MAX_SEARCH_RESULT + + max_page = min(query_args['page'], int(math.ceil(max_search_results / float(per_page)))) # Only allow up to (max_search_results / page) pages + + query_args['page'] = max_page + query_args['max_search_results'] = max_search_results + + query_results = search_elastic(**query_args) + + if rss: + return render_rss('/', query_results, use_elastic=True) + else: + rss_query_string = _generate_query_string(term, category, quality_filter, user_name) + max_results = min(max_search_results, query_results['hits']['total']) + # change p= argument to whatever you change page_parameter to or pagination breaks + pagination = Pagination(p=query_args['page'], per_page=per_page, + total=max_results, bs_version=3, page_parameter='p', + display_msg=SERACH_PAGINATE_DISPLAY_MSG) + return flask.render_template('home.html', + use_elastic=True, + pagination=pagination, + torrent_query=query_results, + search=query_args, + rss_filter=rss_query_string) else: - rss_query_string = _generate_query_string(term, category, quality_filter, user_name) - return flask.render_template('home.html', - torrent_query=query, - search=query_args, - rss_filter=rss_query_string) + # If ES is enabled, default to db search for browsing + if use_elastic: + query_args['term'] = '' + else: # Otherwise, use db search for everything + query_args['term'] = term or '' + + query = search_db(**query_args) + if rss: + return render_rss('/', query, use_elastic=False) + else: + rss_query_string = _generate_query_string(term, category, quality_filter, user_name) + # Use elastic is always false here because we only hit this section + # if we're browsing without a search term (which means we default to DB) + # or if ES is disabled + return flask.render_template('home.html', + use_elastic=False, + torrent_query=query, + search=query_args, + rss_filter=rss_query_string) @app.route('/user/') @@ -291,6 +222,10 @@ def view_user(user_name): if page: page = int(page) + per_page = app.config.get('RESULTS_PER_PAGE') + if not per_page: + per_page = DEFAULT_PER_PAGE + query_args = { 'term': term or '', 'user': user.id, @@ -299,40 +234,82 @@ def view_user(user_name): 'category': category or '0_0', 'quality_filter': quality_filter or '0', 'page': page or 1, - 'rss': False + 'rss': False, + 'per_page': per_page } - # God mode - if flask.g.user and flask.g.user.is_admin: - query_args['admin'] = True - - query = search(**query_args) + if flask.g.user: + query_args['logged_in_user'] = flask.g.user + if flask.g.user.is_admin: # God mode + query_args['admin'] = True + # Use elastic search for term searching rss_query_string = _generate_query_string(term, category, quality_filter, user_name) - return flask.render_template('user.html', - torrent_query=query, - search=query_args, - user=user, - user_page=True, - rss_filter=rss_query_string) + use_elastic = app.config.get('USE_ELASTIC_SEARCH') + if use_elastic and term: + query_args['term'] = term + + max_search_results = app.config.get('ES_MAX_SEARCH_RESULT') + if not max_search_results: + max_search_results = DEFAULT_MAX_SEARCH_RESULT + + max_page = min(query_args['page'], int(math.ceil(max_search_results / float(per_page)))) # Only allow up to (max_search_results / page) pages + + query_args['page'] = max_page + query_args['max_search_results'] = max_search_results + + query_results = search_elastic(**query_args) + + max_results = min(max_search_results, query_results['hits']['total']) + # change p= argument to whatever you change page_parameter to or pagination breaks + pagination = Pagination(p=query_args['page'], per_page=per_page, + total=max_results, bs_version=3, page_parameter='p', + display_msg=SERACH_PAGINATE_DISPLAY_MSG) + return flask.render_template('user.html', + use_elastic=True, + pagination=pagination, + torrent_query=query_results, + search=query_args, + user=user, + user_page=True, + rss_filter=rss_query_string) + # Similar logic as home page + else: + if use_elastic: + query_args['term'] = '' + else: + query_args['term'] = term or '' + query = search_db(**query_args) + return flask.render_template('user.html', + use_elastic=False, + torrent_query=query, + search=query_args, + user=user, + user_page=True, + rss_filter=rss_query_string) @app.template_filter('rfc822') def _jinja2_filter_rfc822(date, fmt=None): return formatdate(float(date.strftime('%s'))) +@app.template_filter('rfc822_es') +def _jinja2_filter_rfc822(datestr, fmt=None): + return formatdate(float(datetime.strptime(datestr, '%Y-%m-%dT%H:%M:%S').strftime('%s'))) -def render_rss(label, query): + +def render_rss(label, query, use_elastic): rss_xml = flask.render_template('rss.xml', + use_elastic=use_elastic, term=label, site_url=flask.request.url_root, - query=query) + torrent_query=query) response = flask.make_response(rss_xml) response.headers['Content-Type'] = 'application/xml' return response -#@app.route('/about', methods=['GET']) +# @app.route('/about', methods=['GET']) # def about(): # return flask.render_template('about.html') @@ -645,4 +622,4 @@ def site_help(): @app.route('/api/upload', methods = ['POST']) def api_upload(): api_response = api_handler.api_upload(flask.request) - return api_response + return api_response \ No newline at end of file diff --git a/nyaa/search.py b/nyaa/search.py new file mode 100644 index 0000000..e6353c5 --- /dev/null +++ b/nyaa/search.py @@ -0,0 +1,317 @@ +import flask +import re +import math +import json +import shlex + +from nyaa import app, db +from nyaa import models + +import sqlalchemy_fulltext.modes as FullTextMode +from sqlalchemy_fulltext import FullTextSearch +from elasticsearch import Elasticsearch +from elasticsearch_dsl import Search, Q + + +def search_elastic(term='', user=None, sort='id', order='desc', + category='0_0', quality_filter='0', page=1, + rss=False, admin=False, logged_in_user=None, + per_page=75, max_search_results=1000): + # This function can easily be memcached now + + es_client = Elasticsearch() + + es_sort_keys = { + 'id': 'id', + 'size': 'filesize', + # 'name': 'display_name', # This is slow and buggy + 'seeders': 'seed_count', + 'leechers': 'leech_count', + 'downloads': 'download_count' + } + + sort_ = sort.lower() + if sort_ not in es_sort_keys: + flask.abort(400) + + es_sort = es_sort_keys[sort] + + order_keys = { + 'desc': 'desc', + 'asc': 'asc' + } + + order_ = order.lower() + if order_ not in order_keys: + flask.abort(400) + + # Only allow ID, desc if RSS + if rss: + sort = es_sort_keys['id'] + order = 'desc' + + # funky, es sort is default asc, prefixed by '-' if desc + if 'desc' == order: + es_sort = '-' + es_sort + + # Quality filter + quality_keys = [ + '0', # Show all + '1', # No remakes + '2', # Only trusted + '3' # Only completed + ] + + if quality_filter.lower() not in quality_keys: + flask.abort(400) + + quality_filter = int(quality_filter) + + # Category filter + main_category = None + sub_category = None + main_cat_id = 0 + sub_cat_id = 0 + if category: + cat_match = re.match(r'^(\d+)_(\d+)$', category) + if not cat_match: + flask.abort(400) + + main_cat_id = int(cat_match.group(1)) + sub_cat_id = int(cat_match.group(2)) + + if main_cat_id > 0: + if sub_cat_id > 0: + sub_category = models.SubCategory.by_category_ids(main_cat_id, sub_cat_id) + if not sub_category: + flask.abort(400) + else: + main_category = models.MainCategory.by_id(main_cat_id) + if not main_category: + flask.abort(400) + + # This might be useless since we validate users + # before coming into this method, but just to be safe... + if user: + user = models.User.by_id(user) + if not user: + flask.abort(404) + user = user.id + + same_user = False + if logged_in_user: + same_user = user == logged_in_user.id + + s = Search(using=es_client, index=app.config.get('ES_INDEX_NAME')) # todo, sukebei prefix + + # Apply search term + if term: + s = s.query('simple_query_string', + analyzer='my_search_analyzer', + default_operator="AND", + query=term) + + # User view (/user/username) + if user: + s = s.filter('term', uploader_id=user) + + if not admin: + # Hide all DELETED torrents if regular user + s = s.filter('term', deleted=False) + # If logged in user is not the same as the user being viewed, + # show only torrents that aren't hidden or anonymous. + # + # If logged in user is the same as the user being viewed, + # show all torrents including hidden and anonymous ones. + # + # On RSS pages in user view, show only torrents that + # aren't hidden or anonymous no matter what + if not same_user or rss: + s = s.filter('term', hidden=False) + s = s.filter('term', anonymous=False) + # General view (homepage, general search view) + else: + if not admin: + # Hide all DELETED torrents if regular user + s = s.filter('term', deleted=False) + # If logged in, show all torrents that aren't hidden unless they belong to you + # On RSS pages, show all public torrents and nothing more. + if logged_in_user and not rss: + hiddenFilter = Q('term', hidden=False) + userFilter = Q('term', uploader_id=logged_in_user.id) + combinedFilter = hiddenFilter | userFilter + s = s.filter('bool', filter=[combinedFilter]) + else: + s = s.filter('term', hidden=False) + + if main_category: + s = s.filter('term', main_category_id=main_cat_id) + elif sub_category: + s = s.filter('term', main_category_id=main_cat_id) + s = s.filter('term', sub_category_id=sub_cat_id) + + if quality_filter == 0: + pass + elif quality_filter == 1: + s = s.filter('term', remake=False) + elif quality_filter == 2: + s = s.filter('term', trusted=True) + elif quality_filter == 3: + s = s.filter('term', complete=True) + + # Apply sort + s = s.sort(es_sort) + + # Only show first RESULTS_PER_PAGE items for RSS + if rss: + s = s[0:per_page] + else: + max_page = min(page, int(math.ceil(max_search_results / float(per_page)))) + from_idx = (max_page-1)*per_page + to_idx = min(max_search_results, max_page*per_page) + s = s[from_idx:to_idx] + + highlight = app.config.get('ENABLE_ELASTIC_SEARCH_HIGHLIGHT') + if highlight: + s = s.highlight_options(tags_schema='styled') + s = s.highlight("display_name") + + # Return query, uncomment print line to debug query + # from pprint import pprint + # print(json.dumps(s.to_dict())) + return s.execute() + + +def search_db(term='', user=None, sort='id', order='desc', category='0_0', + quality_filter='0', page=1, rss=False, admin=False, + logged_in_user=None, per_page=75): + sort_keys = { + 'id': models.Torrent.id, + 'size': models.Torrent.filesize, + # 'name': models.Torrent.display_name, # Disable this because we disabled this in search_elastic, for the sake of consistency + 'seeders': models.Statistic.seed_count, + 'leechers': models.Statistic.leech_count, + 'downloads': models.Statistic.download_count + } + + sort_ = sort.lower() + if sort_ not in sort_keys: + flask.abort(400) + sort = sort_keys[sort] + + order_keys = { + 'desc': 'desc', + 'asc': 'asc' + } + + order_ = order.lower() + if order_ not in order_keys: + flask.abort(400) + + filter_keys = { + '0': None, + '1': (models.TorrentFlags.REMAKE, False), + '2': (models.TorrentFlags.TRUSTED, True), + '3': (models.TorrentFlags.COMPLETE, True) + } + + sentinel = object() + filter_tuple = filter_keys.get(quality_filter.lower(), sentinel) + if filter_tuple is sentinel: + flask.abort(400) + + if user: + user = models.User.by_id(user) + if not user: + flask.abort(404) + user = user.id + + main_category = None + sub_category = None + main_cat_id = 0 + sub_cat_id = 0 + if category: + cat_match = re.match(r'^(\d+)_(\d+)$', category) + if not cat_match: + flask.abort(400) + + main_cat_id = int(cat_match.group(1)) + sub_cat_id = int(cat_match.group(2)) + + if main_cat_id > 0: + if sub_cat_id > 0: + sub_category = models.SubCategory.by_category_ids(main_cat_id, sub_cat_id) + else: + main_category = models.MainCategory.by_id(main_cat_id) + + if not category: + flask.abort(400) + + # Force sort by id desc if rss + if rss: + sort = sort_keys['id'] + order = 'desc' + + same_user = False + if logged_in_user: + same_user = logged_in_user.id == user + + if term: + query = db.session.query(models.TorrentNameSearch) + else: + query = models.Torrent.query + + # User view (/user/username) + if user: + query = query.filter(models.Torrent.uploader_id == user) + + if not admin: + # Hide all DELETED torrents if regular user + query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.DELETED)).is_(False)) + # If logged in user is not the same as the user being viewed, show only torrents that aren't hidden or anonymous + # If logged in user is the same as the user being viewed, show all torrents including hidden and anonymous ones + # On RSS pages in user view, show only torrents that aren't hidden or anonymous no matter what + if not same_user or rss: + query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.HIDDEN | + models.TorrentFlags.ANONYMOUS)).is_(False)) + # General view (homepage, general search view) + else: + if not admin: + # Hide all DELETED torrents if regular user + query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.DELETED)).is_(False)) + # If logged in, show all torrents that aren't hidden unless they belong to you + # On RSS pages, show all public torrents and nothing more. + if logged_in_user and not rss: + query = query.filter((models.Torrent.flags.op('&')(int(models.TorrentFlags.HIDDEN)).is_(False)) | + (models.Torrent.uploader_id == logged_in_user.id)) + # Otherwise, show all torrents that aren't hidden + else: + query = query.filter(models.Torrent.flags.op('&')(int(models.TorrentFlags.HIDDEN)).is_(False)) + + if main_category: + query = query.filter(models.Torrent.main_category_id == main_cat_id) + elif sub_category: + query = query.filter((models.Torrent.main_category_id == main_cat_id) & + (models.Torrent.sub_category_id == sub_cat_id)) + + if filter_tuple: + query = query.filter(models.Torrent.flags.op('&')(int(filter_tuple[0])).is_(filter_tuple[1])) + + if term: + for item in shlex.split(term, posix=False): + if len(item) >= 2: + query = query.filter(FullTextSearch( + item, models.TorrentNameSearch, FullTextMode.NATURAL)) + + # Sort and order + if sort.class_ != models.Torrent: + query = query.join(sort.class_) + + query = query.order_by(getattr(sort, order)()) + + if rss: + query = query.limit(per_page) + else: + query = query.paginate_faste(page, per_page=per_page, step=5) + + return query diff --git a/nyaa/static/css/main.css b/nyaa/static/css/main.css index 83ca0ea..1743595 100644 --- a/nyaa/static/css/main.css +++ b/nyaa/static/css/main.css @@ -97,4 +97,14 @@ table.torrent-list thead th.sorting_desc:after { margin-left: 20px; margin-bottom: 10px; } -} \ No newline at end of file +} + +/* elasticsearch term highlight */ +.hlt1 { + font-style: normal; + display: inline-block; + padding: 0 3px; + border-radius: 3px; + border: 1px solid rgba(100, 56, 0, 0.8); + background: rgba(200,127,0,0.3); +} diff --git a/nyaa/templates/rss.xml b/nyaa/templates/rss.xml index 266e524..e1787d2 100644 --- a/nyaa/templates/rss.xml +++ b/nyaa/templates/rss.xml @@ -4,20 +4,32 @@ RSS Feed for {{ term }} {{ url_for('home', _external=True) }} - {% for torrent in query %} + {% for torrent in torrent_query %} {% if torrent.has_torrent %} {{ torrent.display_name }} + {% if use_elastic %} + {{ url_for('download_torrent', torrent_id=torrent.meta.id, _external=True) }} + {{ url_for('view_torrent', torrent_id=torrent.meta.id, _external=True) }} + {{ torrent.created_time|rfc822_es }} + {% else %} {{ url_for('download_torrent', torrent_id=torrent.id, _external=True) }} {{ url_for('view_torrent', torrent_id=torrent.id, _external=True) }} {{ torrent.created_time|rfc822 }} + {% endif %} {% else %} {{ torrent.display_name }} + {% if use_elastic %} + {{ create_magnet_from_info(torrent.display_name, torrent.info_hash) }} + {{ url_for('view_torrent', torrent_id=torrent.meta.id, _external=True) }} + {{ torrent.created_time|rfc822_es }} + {% else %} {{ torrent.magnet_uri }} {{ url_for('view_torrent', torrent_id=torrent.id, _external=True) }} {{ torrent.created_time|rfc822 }} + {% endif %} {% endif %} {% endfor %} diff --git a/nyaa/templates/search_results.html b/nyaa/templates/search_results.html index bdacd4d..b73994c 100644 --- a/nyaa/templates/search_results.html +++ b/nyaa/templates/search_results.html @@ -8,7 +8,7 @@ {{ caller() }} {% endmacro %} -{% if torrent_query.items %} +{% if (use_elastic and torrent_query.hits.total > 0) or (torrent_query.items) %}
@@ -16,7 +16,7 @@ {% call render_column_header("hdr-category", "width:80px;", center_text=True) %}
Category
{% endcall %} - {% call render_column_header("hdr-name", "width:auto;", sort_key="name") %} + {% call render_column_header("hdr-name", "width:auto;") %}
Name
{% endcall %} {% call render_column_header("hdr-link", "width:70px;", center_text=True) %} @@ -45,27 +45,51 @@ - {% for torrent in torrent_query.items %} + {% set torrents = torrent_query if use_elastic else torrent_query.items %} + {% for torrent in torrents %} - {% set cat_id = (torrent.main_category.id|string) + '_' + (torrent.sub_category.id|string) %} + {% set cat_id = (torrent.main_category_id|string) + '_' + (torrent.sub_category_id|string) if use_elastic else (torrent.main_category.id|string) + '_' + (torrent.sub_category.id|string) %} {% set icon_dir = config.SITE_FLAVOR %} + {% if use_elastic %} + + {% else %} + {% endif %} + {% if use_elastic %} + + {% else %} + {% endif %} + {% if config.ENABLE_SHOW_STATS %} + {% if use_elastic %} + + + + {% else %} {% endif %} + {% endif %} {% endfor %} @@ -76,6 +100,11 @@ {% endif %}
+ {% if use_elastic %} + {{ pagination.info }} + {{ pagination.links }} + {% else %} {% from "bootstrap/pagination.html" import render_pagination %} {{ render_pagination(torrent_query) }} + {% endif %}
diff --git a/nyaa/torrents.py b/nyaa/torrents.py index 192ab0f..1b5dfae 100644 --- a/nyaa/torrents.py +++ b/nyaa/torrents.py @@ -3,6 +3,7 @@ import base64 import time from urllib.parse import urlencode from orderedset import OrderedSet +from nyaa import app from nyaa import bencode from nyaa import app @@ -53,10 +54,23 @@ def get_trackers(torrent): return list(trackers) +def get_trackers_magnet(): + trackers = OrderedSet() + + # Our main one first + main_announce_url = app.config.get('MAIN_ANNOUNCE_URL') + if main_announce_url: + trackers.add(main_announce_url) + + # and finally our tracker list + trackers.update(default_trackers()) + + return list(trackers) + def create_magnet(torrent, max_trackers=5, trackers=None): if trackers is None: - trackers = get_trackers(torrent) + trackers = get_trackers_magnet() magnet_parts = [ ('dn', torrent.display_name) @@ -68,6 +82,24 @@ def create_magnet(torrent, max_trackers=5, trackers=None): return 'magnet:?xt=urn:btih:' + b32_info_hash + '&' + urlencode(magnet_parts) +# For processing ES links +@app.context_processor +def create_magnet_from_info(): + def _create_magnet_from_info(display_name, info_hash, max_trackers=5, trackers=None): + if trackers is None: + trackers = get_trackers_magnet() + + magnet_parts = [ + ('dn', display_name) + ] + for tracker in trackers[:max_trackers]: + magnet_parts.append(('tr', tracker)) + + b32_info_hash = base64.b32encode(bytes.fromhex(info_hash)).decode('utf-8') + return 'magnet:?xt=urn:btih:' + b32_info_hash + '&' + urlencode(magnet_parts) + return dict(create_magnet_from_info=_create_magnet_from_info) + + def create_default_metadata_base(torrent, trackers=None): if trackers is None: trackers = get_trackers(torrent) diff --git a/requirements.txt b/requirements.txt index 224866b..843b935 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,11 +24,17 @@ pycodestyle==2.3.1 pycparser==2.17 pyparsing==2.2.0 six==1.10.0 -SQLAlchemy>=1.1.9 +SQLAlchemy==1.1.9 SQLAlchemy-FullText-Search==0.2.3 -SQLAlchemy-Utils>=0.32.14 +SQLAlchemy-Utils==0.32.14 uWSGI==2.0.15 visitor==0.1.3 webassets==0.12.1 Werkzeug==0.12.1 WTForms==2.1 +## elasticsearch dependencies +elasticsearch==5.3.0 +elasticsearch-dsl==5.2.0 +progressbar2==3.20.0 +mysql-replication==0.13 +flask-paginate==0.4.5 \ No newline at end of file diff --git a/sync_es.py b/sync_es.py new file mode 100644 index 0000000..45c98a5 --- /dev/null +++ b/sync_es.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python +""" +stream changes in mysql (on the torrents and statistics table) into +elasticsearch as they happen on the binlog. This keeps elasticsearch in sync +with whatever you do to the database, including stuff like admin queries. Also, +because mysql keeps the binlog around for N days before deleting old stuff, you +can survive a hiccup of elasticsearch or this script dying and pick up where +you left off. + +For that "picking up" part, this script depends on one piece of external state: +its last known binlog filename and position. This is saved off as a JSON file +to a configurable location on the filesystem periodically. If the file is not +present then you can initialize it with the values from `SHOW MASTER STATUS` +from the mysql repl, which will start the sync from current state. + +In the case of catastrophic elasticsearch meltdown where you need to +reconstruct the index, you'll want to be a bit careful with coordinating +sync_es and import_to_es scripts. If you run import_to_es first than run +sync_es against SHOW MASTER STATUS, anything that changed the database between +when import_to_es and sync_es will be lost. Instead, you can run SHOW MASTER +STATUS _before_ you run import_to_es. That way you'll definitely pick up any +changes that happen while the import_to_es script is dumping stuff from the +database into es, at the expense of redoing a (small) amount of indexing. +""" +from elasticsearch import Elasticsearch +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent +from datetime import datetime +from nyaa.models import TorrentFlags +import sys +import json +import time +import logging + +logging.basicConfig() + +log = logging.getLogger('sync_es') +log.setLevel(logging.INFO) + +#logging.getLogger('elasticsearch').setLevel(logging.DEBUG) + +# in prod want in /var/lib somewhere probably +SAVE_LOC = "/var/lib/sync_es_position.json" +MYSQL_HOST = '127.0.0.1' +MYSQL_PORT = 3306 +MYSQL_USER = 'test' +MYSQL_PW = 'test123' +NT_DB = 'nyaav2' + +with open(SAVE_LOC) as f: + pos = json.load(f) + +es = Elasticsearch() + +stream = BinLogStreamReader( + # TODO parse out from config.py or something + connection_settings = { + 'host': MYSQL_HOST, + 'port': MYSQL_PORT, + 'user': MYSQL_USER, + 'passwd': MYSQL_PW + }, + server_id=10, # arbitrary + # only care about this database currently + only_schemas=[NT_DB], + # these tables in the database + only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"], + # from our save file + resume_stream=True, + log_file=pos['log_file'], + log_pos=pos['log_pos'], + # skip the other stuff like table mapping + only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent], + # if we're at the head of the log, block until something happens + # note it'd be nice to block async-style instead, but the mainline + # binlogreader is synchronous. there is an (unmaintained?) fork + # using aiomysql if anybody wants to revive that. + blocking=True) + +def reindex_torrent(t, index_name): + # XXX annoyingly different from import_to_es, and + # you need to keep them in sync manually. + f = t['flags'] + doc = { + "id": t['id'], + "display_name": t['display_name'], + "created_time": t['created_time'], + "updated_time": t['updated_time'], + "description": t['description'], + # not analyzed but included so we can render magnet links + # without querying sql again. + "info_hash": t['info_hash'].hex(), + "filesize": t['filesize'], + "uploader_id": t['uploader_id'], + "main_category_id": t['main_category_id'], + "sub_category_id": t['sub_category_id'], + # XXX all the bitflags are numbers + "anonymous": bool(f & TorrentFlags.ANONYMOUS), + "trusted": bool(f & TorrentFlags.TRUSTED), + "remake": bool(f & TorrentFlags.REMAKE), + "complete": bool(f & TorrentFlags.COMPLETE), + # TODO instead of indexing and filtering later + # could delete from es entirely. Probably won't matter + # for at least a few months. + "hidden": bool(f & TorrentFlags.HIDDEN), + "deleted": bool(f & TorrentFlags.DELETED), + "has_torrent": bool(t['has_torrent']), + } + # update, so we don't delete the stats if present + es.update( + index=index_name, + doc_type='torrent', + id=t['id'], + body={"doc": doc, "doc_as_upsert": True}) + +def reindex_stats(s, index_name): + es.update( + index=index_name, + doc_type='torrent', + id=s['torrent_id'], + body={ + "doc": { + "stats_last_updated": s["last_updated"], + "download_count": s["download_count"], + "leech_count": s['leech_count'], + "seed_count": s['seed_count'], + }}) + +n = 0 +last_save = time.time() + +for event in stream: + for row in event.rows: + if event.table == "nyaa_torrents" or event.table == "sukebei_torrents": + if event.table == "nyaa_torrents": + index_name = "nyaa" + else: + index_name = "sukebei" + if type(event) is WriteRowsEvent: + reindex_torrent(row['values'], index_name) + elif type(event) is UpdateRowsEvent: + reindex_torrent(row['after_values'], index_name) + elif type(event) is DeleteRowsEvent: + # just delete it + es.delete(index=index_name, doc_type='torrent', id=row['values']['id']) + else: + raise Exception(f"unknown event {type(event)}") + elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics": + if event.table == "nyaa_torrents": + index_name = "nyaa" + else: + index_name = "sukebei" + if type(event) is WriteRowsEvent: + reindex_stats(row['values'], index_name) + elif type(event) is UpdateRowsEvent: + reindex_stats(row['after_values'], index_name) + elif type(event) is DeleteRowsEvent: + # uh ok. assume that the torrent row will get deleted later. + pass + else: + raise Exception(f"unknown event {type(event)}") + else: + raise Exception(f"unknown table {s.table}") + n += 1 + if n % 100 == 0 or time.time() - last_save > 30: + log.info(f"saving position {stream.log_file}/{stream.log_pos}") + with open(SAVE_LOC, 'w') as f: + json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f)
+ {% if use_elastic %} + + {% else %} + {% endif %} {%if "highlight" in torrent.meta %}{{ torrent.meta.highlight.display_name[0] | safe }}{% else %}{{torrent.display_name}}{%endif%}{{ torrent.display_name | escape }} {% if torrent.has_torrent %}{% endif %} + {% if use_elastic %} + + {% else %} + {% endif %} {{ torrent.filesize | filesizeformat(True) }}{{ torrent.created_time | display_time }}{{ torrent.created_time.strftime('%Y-%m-%d %H:%M') }}{{ torrent.seed_count }}{{ torrent.leech_count }}{{ torrent.download_count }}{{ torrent.stats.seed_count }} {{ torrent.stats.leech_count }} {{ torrent.stats.download_count }}