from typing import Optional from time import time from html import escape from mautrix.types import TextMessageEventContent, MessageType, Format, RelatesTo, RelationType from maubot import Plugin, MessageEvent from maubot.handlers import command import sqlite3 class PizzaBot(Plugin): def __init__(self): create_db() @staticmethod def plural(num: float, unit: str, decimals: Optional[int] = None) -> str: num = round(num, decimals) if num == 1: return f"{num} {unit}" else: return f"{num} {unit}s" @classmethod def create_db () -> None: s = sqlite3.connect("pizza.db") c = s.cursor() c.execute(''' CREATE TABLE if not exists pizza_table( id INTEGER PRIMARY KEY AUTOINCREMENT, order text NOT NULL, owner text NOT NULL, count integer DEFAULT 1, ts integer NOT NULL );''') s.commit() @classmethod def ms(): return round(time.time()*1000) @classmethod def insert(owner, text) -> None: s = sqlite3.connect("pizza.db") c = s.cursor() c.execute("INSERT INTO pizza_table (order,owner,ts) VALUES (?,?,?)", (owner,text, ms())) s.commit() @classmethod def lookup(text) -> bool: s = sqlite3.connect("pizza.db") c = s.cursor() c.execute("SELECT order FROM pizza_table where owner = ? ORDER BY ts DESC LIMIT 1;", (owner)) r = c.fetchone() return r is not None @classmethod def lookup_order() -> str: s = sqlite3.connect("pizza.db") c = s.cursor() c.execute("SELECT order FROM pizza_table where ts > ? ORDER BY ts DESC;", (ms() - 60*60*4)) r = c.fetchall() msg = "Saved orders within the last 4 hours:" for entry in r: msg += str(entry[0]) + "\n" return msg @command.new("pizza", help="Add a Pizza") @command.argument("message", pass_raw=True, required=True) async def ping_handler(self, evt: MessageEvent, message: str = "") -> None: owner = evt.sender.split(":", 1)[1] if not lookup(owner): insert(owner, message) @command.new("order", help="Show complete order") @command.argument("message", pass_raw=True, required=True) async def ping_handler(self, evt: MessageEvent, message: str = "") -> None: owner = evt.sender.split(":", 1)[1] msg = lookup_order() await evt.respond(msg) @classmethod def prettify_diff(cls, diff: int) -> str: if abs(diff) < 10 * 1_000: return f"{diff} ms" elif abs(diff) < 60 * 1_000: return cls.plural(diff / 1_000, 'second', decimals=1) minutes, seconds = divmod(diff / 1_000, 60) if abs(minutes) < 60: return f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}" hours, minutes = divmod(minutes, 60) if abs(hours) < 24: return (f"{cls.plural(hours, 'hour')}, {cls.plural(minutes, 'minute')}" f" and {cls.plural(seconds, 'second')}") days, hours = divmod(hours, 24) return (f"{cls.plural(days, 'day')}, {cls.plural(hours, 'hour')}, " f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}") @command.new("ping", help="Ping") @command.argument("message", pass_raw=True, required=False) async def ping_handler(self, evt: MessageEvent, message: str = "") -> None: diff = int(time() * 1000) - evt.timestamp pretty_diff = self.prettify_diff(diff) text_message = f'"{message[:20]}" took' if message else "took" html_message = f'"{escape(message[:20])}" took' if message else "took" content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"{evt.sender}: Pong! (ping {text_message} {pretty_diff} to arrive)", formatted_body=f"{evt.sender}: Pong! " f"(ping {html_message} " f"{pretty_diff} to arrive)", relates_to=RelatesTo( rel_type=RelationType("xyz.maubot.pong"), event_id=evt.event_id, )) pong_from = evt.sender.split(":", 1)[1] content.relates_to["from"] = pong_from content.relates_to["ms"] = diff content["pong"] = { "ms": diff, "from": pong_from, "ping": evt.event_id, } await evt.respond(content) @command.new("echo", help="Repeat a message") @command.argument("message", pass_raw=True) async def echo_handler(self, evt: MessageEvent, message: str) -> None: await evt.respond(message)