1
0
Fork 0
mirror of https://gitlab.com/SIGBUS/nyaa.git synced 2024-06-26 07:17:36 +00:00
nyaa/nyaa/views/users.py
Anna-Maria Meriniemi c5d705210d Read-only maintenance mode setting for config.py (#356)
Disables all POSTs, optionally allowing users to log in (without updating last login date)
Blocked POSTs will redirect to the GET endpoint if possible, otherwise to referrer or in last case, home page.
API requests will get a plaintext message with 405 status code.
2017-09-04 18:16:52 -04:00

246 lines
8.8 KiB
Python

import math
from ipaddress import ip_address
import flask
from flask_paginate import Pagination
from itsdangerous import BadSignature, URLSafeSerializer
from nyaa import forms, models
from nyaa.extensions import db
from nyaa.search import (DEFAULT_MAX_SEARCH_RESULT, DEFAULT_PER_PAGE, SERACH_PAGINATE_DISPLAY_MSG,
_generate_query_string, search_db, search_elastic)
from nyaa.utils import chain_get
app = flask.current_app
bp = flask.Blueprint('users', __name__)
@bp.route('/user/<user_name>', methods=['GET', 'POST'])
def view_user(user_name):
user = models.User.by_username(user_name)
if not user:
flask.abort(404)
admin_form = None
ban_form = None
bans = None
ipbanned = None
if flask.g.user and flask.g.user.is_moderator and flask.g.user.level > user.level:
admin_form = forms.UserForm()
default, admin_form.user_class.choices = _create_user_class_choices(user)
if flask.request.method == 'GET':
admin_form.user_class.data = default
ban_form = forms.BanForm()
if flask.request.method == 'POST':
doban = (ban_form.ban_user.data or ban_form.unban.data or ban_form.ban_userip.data)
bans = models.Ban.banned(user.id, user.last_login_ip).all()
ipbanned = list(filter(lambda b: b.user_ip == user.last_login_ip, bans))
url = flask.url_for('users.view_user', user_name=user.username)
if flask.request.method == 'POST' and admin_form and not doban and admin_form.validate():
selection = admin_form.user_class.data
log = None
if selection == 'regular':
user.level = models.UserLevelType.REGULAR
log = "[{}]({}) changed to regular user".format(user_name, url)
elif selection == 'trusted':
user.level = models.UserLevelType.TRUSTED
log = "[{}]({}) changed to trusted user".format(user_name, url)
elif selection == 'moderator':
user.level = models.UserLevelType.MODERATOR
log = "[{}]({}) changed to moderator user".format(user_name, url)
adminlog = models.AdminLog(log=log, admin_id=flask.g.user.id)
db.session.add(user)
db.session.add(adminlog)
db.session.commit()
return flask.redirect(url)
if flask.request.method == 'POST' and ban_form and doban and ban_form.validate():
if (ban_form.ban_user.data and user.is_banned) or \
(ban_form.ban_userip.data and ipbanned) or \
(ban_form.unban.data and not user.is_banned and not bans):
flask.flash(flask.Markup('What the fuck are you doing?'), 'danger')
return flask.redirect(url)
user_str = "[{0}]({1})".format(user.username, url)
if ban_form.unban.data:
action = "unbanned"
user.status = models.UserStatusType.ACTIVE
db.session.add(user)
for ban in bans:
if ban.user_ip:
user_str += " IP({0})".format(ip_address(ban.user_ip))
db.session.delete(ban)
else:
action = "banned"
user.status = models.UserStatusType.BANNED
db.session.add(user)
ban = models.Ban(admin_id=flask.g.user.id, user_id=user.id, reason=ban_form.reason.data)
db.session.add(ban)
if ban_form.ban_userip.data:
ban.user_ip = ip_address(user.last_login_ip)
user_str += " IP({0})".format(ban.user_ip)
ban.user_ip = ban.user_ip.packed
log = "User {0} has been {1}.".format(user_str, action)
adminlog = models.AdminLog(log=log, admin_id=flask.g.user.id)
db.session.add(adminlog)
db.session.commit()
flask.flash(flask.Markup('User has been successfully {0}.'.format(action)), 'success')
return flask.redirect(url)
req_args = flask.request.args
search_term = chain_get(req_args, 'q', 'term')
sort_key = req_args.get('s')
sort_order = req_args.get('o')
category = chain_get(req_args, 'c', 'cats')
quality_filter = chain_get(req_args, 'f', 'filter')
page_number = chain_get(req_args, 'p', 'page', 'offset')
try:
page_number = max(1, int(page_number))
except (ValueError, TypeError):
page_number = 1
results_per_page = app.config.get('RESULTS_PER_PAGE', DEFAULT_PER_PAGE)
query_args = {
'term': search_term or '',
'user': user.id,
'sort': sort_key or 'id',
'order': sort_order or 'desc',
'category': category or '0_0',
'quality_filter': quality_filter or '0',
'page': page_number,
'rss': False,
'per_page': results_per_page
}
if flask.g.user:
query_args['logged_in_user'] = flask.g.user
if flask.g.user.is_moderator: # God mode
query_args['admin'] = True
# Use elastic search for term searching
rss_query_string = _generate_query_string(search_term, category, quality_filter, user_name)
use_elastic = app.config.get('USE_ELASTIC_SEARCH')
if use_elastic and search_term:
query_args['term'] = search_term
max_search_results = app.config.get('ES_MAX_SEARCH_RESULT', DEFAULT_MAX_SEARCH_RESULT)
# Only allow up to (max_search_results / page) pages
max_page = min(query_args['page'], int(math.ceil(max_search_results / results_per_page)))
query_args['page'] = max_page
query_args['max_search_results'] = max_search_results
query_results = search_elastic(**query_args)
max_results = min(max_search_results, query_results['hits']['total'])
# change p= argument to whatever you change page_parameter to or pagination breaks
pagination = Pagination(p=query_args['page'], per_page=results_per_page,
total=max_results, bs_version=3, page_parameter='p',
display_msg=SERACH_PAGINATE_DISPLAY_MSG)
return flask.render_template('user.html',
use_elastic=True,
pagination=pagination,
torrent_query=query_results,
search=query_args,
user=user,
user_page=True,
rss_filter=rss_query_string,
admin_form=admin_form,
ban_form=ban_form,
bans=bans,
ipbanned=ipbanned)
# Similar logic as home page
else:
if use_elastic:
query_args['term'] = ''
else:
query_args['term'] = search_term or ''
query = search_db(**query_args)
return flask.render_template('user.html',
use_elastic=False,
torrent_query=query,
search=query_args,
user=user,
user_page=True,
rss_filter=rss_query_string,
admin_form=admin_form,
ban_form=ban_form,
bans=bans,
ipbanned=ipbanned)
@bp.route('/user/activate/<payload>')
def activate_user(payload):
if app.config['MAINTENANCE_MODE']:
flask.flash(flask.Markup('<strong>Activations are currently disabled.</strong>'), 'danger')
return flask.redirect(flask.url_for('main.home'))
s = get_serializer()
try:
user_id = s.loads(payload)
except BadSignature:
flask.abort(404)
user = models.User.by_id(user_id)
if not user:
flask.abort(404)
user.status = models.UserStatusType.ACTIVE
db.session.add(user)
db.session.commit()
return flask.redirect(flask.url_for('account.login'))
def _create_user_class_choices(user):
choices = [('regular', 'Regular')]
default = 'regular'
if flask.g.user:
if flask.g.user.is_moderator:
choices.append(('trusted', 'Trusted'))
if flask.g.user.is_superadmin:
choices.append(('moderator', 'Moderator'))
if user:
if user.is_moderator:
default = 'moderator'
elif user.is_trusted:
default = 'trusted'
elif user.is_banned:
default = 'banned'
return default, choices
def get_serializer(secret_key=None):
if secret_key is None:
secret_key = app.secret_key
return URLSafeSerializer(secret_key)
def get_activation_link(user):
s = get_serializer()
payload = s.dumps(user.id)
return flask.url_for('users.activate_user', payload=payload, _external=True)