1
0
Fork 0
mirror of https://gitlab.com/SIGBUS/nyaa.git synced 2024-12-22 09:19:59 +00:00
nyaa/nyaa/bencode.py
queue b1d187613a bencode: fix nontermination on empty or malformed input
read(1) returns b'' on EOF, which wasn't checked for in a couple cases,
which could cause empty/truncated input to enter one of the `while True`
loops without hope of exit. programming is hard.
2017-07-24 20:53:48 -06:00

168 lines
5.2 KiB
Python

from io import BytesIO
def _pairwise(iterable):
""" Returns items from an iterable two at a time, ala
[0, 1, 2, 3, ...] -> [(0, 1), (2, 3), ...] """
iterable = iter(iterable)
return zip(iterable, iterable)
__all__ = ['encode', 'decode', 'BencodeException', 'MalformedBencodeException']
# https://wiki.theory.org/BitTorrentSpecification#Bencoding
class BencodeException(Exception):
pass
class MalformedBencodeException(BencodeException):
pass
# bencode types
_DIGITS = b'0123456789'
_B_INT = b'i'
_B_LIST = b'l'
_B_DICT = b'd'
_B_END = b'e'
# Decoding of bencoded data
def _bencode_decode(file_object, decode_keys_as_utf8=True):
""" Decodes a bencoded value, raising a MalformedBencodeException on errors.
decode_keys_as_utf8 controls decoding dict keys as utf8 (which they
almost always are) """
if isinstance(file_object, str):
file_object = file_object.encode('utf8')
if isinstance(file_object, bytes):
file_object = BytesIO(file_object)
def create_ex(msg):
return MalformedBencodeException(
'{0} at position {1} (0x{1:02X} hex)'.format(msg, file_object.tell()))
def _read_list():
""" Decodes values from stream until a None is returned ('e') """
items = []
while True:
value = _bencode_decode(file_object, decode_keys_as_utf8=decode_keys_as_utf8)
if value is None:
break
items.append(value)
return items
kind = file_object.read(1)
if not kind:
raise create_ex('EOF, expecting kind')
if kind == _B_INT: # Integer
int_bytes = b''
while True:
c = file_object.read(1)
if not c:
raise create_ex('EOF, expecting more integer')
elif c == _B_END:
try:
return int(int_bytes.decode('utf8'))
except Exception as e:
raise create_ex('Unable to parse int')
# not a digit OR '-' in the middle of the int
if (c not in _DIGITS + b'-') or (c == b'-' and int_bytes):
raise create_ex('Unexpected input while reading an integer: ' + repr(c))
else:
int_bytes += c
elif kind == _B_LIST: # List
return _read_list()
elif kind == _B_DICT: # Dictionary
keys_and_values = _read_list()
if len(keys_and_values) % 2 != 0:
raise MalformedBencodeException('Uneven amount of key/value pairs')
# "Technically" the bencode dictionary keys are bytestrings,
# but real-world they're always(?) UTF-8.
decoded_dict = dict((decode_keys_as_utf8 and k.decode('utf8') or k, v)
for k, v in _pairwise(keys_and_values))
return decoded_dict
# List/dict end, but make sure input is not just 'e'
elif kind == _B_END and file_object.tell() > 0:
return None
elif kind in _DIGITS: # Bytestring
str_len_bytes = kind # keep first digit
# Read string length until a ':'
while True:
c = file_object.read(1)
if not c:
raise create_ex('EOF, expecting more string len')
if c in _DIGITS:
str_len_bytes += c
elif c == b':':
break
else:
raise create_ex('Unexpected input while reading string length: ' + repr(c))
try:
str_len = int(str_len_bytes.decode())
except Exception as e:
raise create_ex('Unable to parse bytestring length')
bytestring = file_object.read(str_len)
if len(bytestring) != str_len:
raise create_ex('Read only {} bytes, {} wanted'.format(len(bytestring), str_len))
return bytestring
else:
raise create_ex('Unexpected data type ({})'.format(repr(kind)))
# Bencoding
def _bencode_int(value):
""" Encode an integer, eg 64 -> i64e """
return _B_INT + str(value).encode('utf8') + _B_END
def _bencode_bytes(value):
""" Encode a bytestring (strings as UTF-8), eg 'hello' -> 5:hello """
if isinstance(value, str):
value = value.encode('utf8')
return str(len(value)).encode('utf8') + b':' + value
def _bencode_list(value):
""" Encode a list, eg [64, "hello"] -> li64e5:helloe """
return _B_LIST + b''.join(_bencode(item) for item in value) + _B_END
def _bencode_dict(value):
""" Encode a dict, which is keys and values interleaved as a list,
eg {"hello":123}-> d5:helloi123ee """
dict_keys = sorted(value.keys()) # Sort keys as per spec
return _B_DICT + b''.join(
_bencode_bytes(key) + _bencode(value[key]) for key in dict_keys) + _B_END
def _bencode(value):
""" Bencode any supported value (int, bytes, str, list, dict) """
if isinstance(value, int):
return _bencode_int(value)
elif isinstance(value, (str, bytes)):
return _bencode_bytes(value)
elif isinstance(value, list):
return _bencode_list(value)
elif isinstance(value, dict):
return _bencode_dict(value)
raise BencodeException('Unsupported type ' + str(type(value)))
# The functions call themselves
encode = _bencode
decode = _bencode_decode