Message Archive Management (MAM) support for groupchats

- Groupchats now support XEP-0313 Message Archive Management (MAM)!
- This uses the history stored in the sqlite database, as implemented in the
  previous commits.
  - The QUERY-ARCHIVE megafunction builds up a SQL query to get stuff out of the
    database, in accordance with provided MAM + RSM parameters.
- Notably, various hacks are in here that need to be fixed.
  - IQ 'set's are now processed, which means we needed to add a stub impl of
    Schrödinger's Chat so people don't drop out of MUCs all of a sudden.
    (Well, it just responds to every ping indiscriminately...)
  - Oh also the new presence subscription stuff from earlier is borked.
This commit is contained in:
eta 2020-09-27 23:01:40 +01:00
parent f979abbd35
commit c36d61687a
10 changed files with 229 additions and 9 deletions

View file

@ -247,6 +247,10 @@
:disco-items) :disco-items)
((and (equal xmlns +vcard-temp-ns+) (equal tag-name "vCard")) ((and (equal xmlns +vcard-temp-ns+) (equal tag-name "vCard"))
:vcard-temp-get) :vcard-temp-get)
((and (equal xmlns +mam-ns+) (equal tag-name "query"))
:mam-query)
((and (equal xmlns +ping-ns+) (equal tag-name "ping"))
:ping)
(t (t
:generic-iq)))) :generic-iq))))
(call-component-iq-handler comp handler-type (call-component-iq-handler comp handler-type
@ -261,7 +265,7 @@
(let ((type (dom:get-attribute stanza "type")) (let ((type (dom:get-attribute stanza "type"))
(id (dom:get-attribute stanza "id")) (id (dom:get-attribute stanza "id"))
(from (dom:get-attribute stanza "from"))) (from (dom:get-attribute stanza "from")))
(if (equal type "get") (if (or (equal type "get") (equal type "set"))
(handle-iq-get comp id from stanza) (handle-iq-get comp id from stanza)
(symbol-macrolet (symbol-macrolet
((promise (gethash id (component-promises comp)))) ((promise (gethash id (component-promises comp))))

88
db.lisp
View file

@ -35,6 +35,15 @@
(with-bound-columns (id) get-stmt (with-bound-columns (id) get-stmt
id)))) id))))
(defun get-user-chat-localpart (chat-id)
"Get the user chat localpart for CHAT-ID, or NIL if none exists."
(with-prepared-statements
((get-stmt "SELECT wa_jid FROM user_chats WHERE id = ?"))
(bind-parameters get-stmt chat-id)
(when (sqlite:step-statement get-stmt)
(with-bound-columns (localpart) get-stmt
localpart))))
(defun get-user-chat-subject (uid localpart) (defun get-user-chat-subject (uid localpart)
"Get the user chat subject of LOCALPART for UID, or NIL if none exists." "Get the user chat subject of LOCALPART for UID, or NIL if none exists."
(with-prepared-statements (with-prepared-statements
@ -145,11 +154,13 @@
(defun insert-xmpp-message (xm) (defun insert-xmpp-message (xm)
"Inserts XM, a groupchat XMPP-MESSAGE, into the database." "Inserts XM, a groupchat XMPP-MESSAGE, into the database."
(assert (uiop:string-prefix-p "g" (conversation xm)) () "Tried to insert XMPP message for non-groupchat conversation ~A" (conversation xm)) (assert (uiop:string-prefix-p "g" (conversation xm)) () "Tried to insert XMPP message for non-groupchat conversation ~A" (conversation xm))
(let ((chat-id (or (let* ((chat-id (or
(get-user-chat-id (uid xm) (conversation xm)) (get-user-chat-id (uid xm) (conversation xm))
(error "Couldn't find chat id for conversation ~A / uid ~A" (error "Couldn't find chat id for conversation ~A / uid ~A"
(conversation xm) (uid xm)))) (conversation xm) (uid xm))))
(ts-unix (local-time:timestamp-to-unix (timestamp xm))))
(local-time:*default-timezone* local-time:+utc-zone+)
(ts-unix (local-time:timestamp-to-unix (timestamp xm))))
(with-prepared-statements (with-prepared-statements
((insert-stmt "INSERT INTO user_chat_history (user_id, chat_id, user_from, ts_unix, xmpp_id, orig_id, body, oob_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) ((insert-stmt "INSERT INTO user_chat_history (user_id, chat_id, user_from, ts_unix, xmpp_id, orig_id, body, oob_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"))
(bind-parameters insert-stmt (1 (uid xm)) (2 chat-id) (3 (from xm)) (4 ts-unix) (5 (xmpp-id xm)) (6 (orig-id xm)) (7 (body xm)) (8 (oob-url xm))) (bind-parameters insert-stmt (1 (uid xm)) (2 chat-id) (3 (from xm)) (4 ts-unix) (5 (xmpp-id xm)) (6 (orig-id xm)) (7 (body xm)) (8 (oob-url xm)))
@ -163,3 +174,70 @@
(when (sqlite:step-statement get-stmt) (when (sqlite:step-statement get-stmt)
(with-bound-columns (xid) get-stmt (with-bound-columns (xid) get-stmt
xid)))) xid))))
(defun get-chat-history-ts (uid chat-id xmpp-id)
"Look up the UNIX timestamp for the given UID, CHAT-ID and XMPP-ID."
(with-prepared-statements
((get-stmt "SELECT ts_unix FROM user_chat_history WHERE user_id = ? AND chat_id = ? AND xmpp_id = ?"))
(bind-parameters get-stmt uid chat-id xmpp-id)
(when (sqlite:step-statement get-stmt)
(with-bound-columns (tsu) get-stmt
tsu))))
(defun query-archive (uid chat-id &key start end (limit 100) reference-stanza-id forward-page)
"Query the chat history archive for the chat identified by CHAT-ID and UID. Optionally narrow the query using START and END (UNIX timestamps), returning at most LIMIT items (which is clamped to 100).
If an RSM REFERENCE-STANZA-ID is provided, narrow the query to be either after (T) or before (NIL) the history entry with that stanza ID, depending on the value of FORWARD-PAGE (see brackets)."
(let ((statement (make-string-output-stream))
(localpart (get-user-chat-localpart chat-id))
(local-time:*default-timezone* local-time:+utc-zone+)
(args (list chat-id uid)) ; WARNING this list is nreverse'd later!
(items-returned 0)
(sqlite-stmt))
(format statement "SELECT user_from, ts_unix, xmpp_id, orig_id, body, oob_url FROM user_chat_history WHERE user_id = ? AND chat_id = ?")
(when reference-stanza-id
(let ((reference-ts (or
(get-chat-history-ts uid chat-id reference-stanza-id)
(error "Couldn't locate reference stanza ID ~A" reference-stanza-id))))
(if forward-page
(setf start reference-ts)
(setf end reference-ts))))
(when start
(format statement " AND ts_unix > ?")
(push start args))
(when end
(format statement " AND ts_unix < ?")
(push end args))
(unless limit
(setf limit 100))
(when (> limit 100)
(setf limit 100)) ; clamp me owo
;; We copy a trick from biboumi: in order to figure out whether there are
;; more results if not for the limit existing, simply increment the limit
;; by 1 and see if you get the extra element.
(format statement " ORDER BY ts_unix ~A LIMIT ~A" (if forward-page "ASC" "DESC") (1+ limit))
(setf args (nreverse args))
(bt:with-recursive-lock-held (*db-lock*)
(let ((stmt-text (get-output-stream-string statement)))
(setf sqlite-stmt (sqlite:prepare-statement *db* stmt-text)))
(loop
for param in args
for n from 1
do (sqlite:bind-parameter sqlite-stmt n param))
(values
(funcall
(if forward-page #'identity #'nreverse)
(loop
while (sqlite:step-statement sqlite-stmt)
do (incf items-returned)
while (<= items-returned limit)
collect (with-bound-columns (from ts-unix xmpp-id orig-id body oob-url) sqlite-stmt
(make-instance 'xmpp-message
:uid uid
:conversation localpart
:from from
:timestamp (local-time:unix-to-timestamp ts-unix)
:xmpp-id xmpp-id
:orig-id orig-id
:body body
:oob-url oob-url))))
(<= items-returned limit)))))

View file

@ -984,6 +984,7 @@ buildLisp.program {
"xep-0030.lisp" "xep-0030.lisp"
"xep-0363.lisp" "xep-0363.lisp"
"xep-0115.lisp" "xep-0115.lisp"
"xep-0313.lisp"
"sqlite.lisp" "sqlite.lisp"
"db.lisp" "db.lisp"
"media.lisp" "media.lisp"

View file

@ -112,6 +112,39 @@ FIXME: the above behaviour is a bit meh."
do (format oss "> ~A~%" item)) do (format oss "> ~A~%" item))
(get-output-stream-string oss))) (get-output-stream-string oss)))
(defun deliver-mam-history-message (comp msg to-jid &optional query-id)
"Deliver MSG, an XMPP-MESSAGE, to TO-JID as a piece of MAM history, as part of the response to a MAM query with QUERY-ID."
(let* ((component-host (component-name comp))
(mam-from (concatenate 'string (conversation msg) "@" component-host))
(real-from (concatenate 'string mam-from "/" (from msg))))
(with-message (comp to-jid
:from mam-from
:type nil)
(cxml:with-element "result"
(cxml:attribute "xmlns" +mam-ns+)
(when query-id
(cxml:attribute "queryid" query-id))
(cxml:attribute "id" (xmpp-id msg))
(cxml:with-element "forwarded"
(cxml:attribute "xmlns" +forwarded-ns+)
(cxml:with-element "delay"
(cxml:attribute "xmlns" +delivery-delay-ns+)
(cxml:attribute "stamp" (local-time:format-timestring nil (timestamp msg))))
(cxml:with-element "message"
(cxml:attribute "from" real-from)
(cxml:attribute "xmlns" +client-ns+)
(cxml:attribute "type" "groupchat")
(cxml:with-element "body"
(cxml:text (body msg)))
(when (oob-url msg)
(cxml:with-element "x"
(cxml:attribute "xmlns" +oob-ns+)
(cxml:with-element "url"
(cxml:text (oob-url msg)))))
(when (orig-id msg)
(cxml:with-element "origin-id"
(cxml:attribute "xmlns" +unique-stanzas-ns+)
(cxml:attribute "id" (orig-id msg))))))))))
(defun deliver-xmpp-message (comp msg) (defun deliver-xmpp-message (comp msg)
"Deliver MSG, an XMPP-MESSAGE, to the intended destinations on COMP." "Deliver MSG, an XMPP-MESSAGE, to the intended destinations on COMP."

View file

@ -22,3 +22,9 @@
(defparameter +chat-states-ns+ "http://jabber.org/protocol/chatstates") (defparameter +chat-states-ns+ "http://jabber.org/protocol/chatstates")
(defparameter +hints-ns+ "urn:xmpp:hints") (defparameter +hints-ns+ "urn:xmpp:hints")
(defparameter +entity-caps-ns+ "http://jabber.org/protocol/caps") (defparameter +entity-caps-ns+ "http://jabber.org/protocol/caps")
(defparameter +mam-ns+ "urn:xmpp:mam:2")
(defparameter +rsm-ns+ "http://jabber.org/protocol/rsm")
(defparameter +data-forms-ns+ "jabber:x:data")
(defparameter +forwarded-ns+ "urn:xmpp:forward:0")
(defparameter +client-ns+ "jabber:client")
(defparameter +ping-ns+ "urn:xmpp:ping")

View file

@ -72,3 +72,5 @@ CREATE TABLE user_chat_history (
body VARCHAR NOT NULL, body VARCHAR NOT NULL,
oob_url VARCHAR oob_url VARCHAR
); );
CREATE UNIQUE INDEX user_chat_history_unique ON user_chat_history (user_id, chat_id, xmpp_id);

View file

@ -59,6 +59,7 @@
`((disco-identity ,chat-subject "text" "conference") `((disco-identity ,chat-subject "text" "conference")
(disco-feature ,+disco-info-ns+) (disco-feature ,+disco-info-ns+)
(disco-feature ,+muc-ns+) (disco-feature ,+muc-ns+)
(disco-feature ,+mam-ns+)
(disco-feature ,+muc-stable-id-ns+) (disco-feature ,+muc-stable-id-ns+)
(disco-feature ,+unique-stanzas-ns+) (disco-feature ,+unique-stanzas-ns+)
(disco-feature "muc_hidden") (disco-feature "muc_hidden")
@ -272,6 +273,61 @@ WhatsXMPP represents users as u440123456789 and groups as g1234-5678."
(admin-presence comp jid "Programming error" "xa") (admin-presence comp jid "Programming error" "xa")
(remhash jid (component-whatsapps comp)))) (remhash jid (component-whatsapps comp))))
(defun whatsxmpp-ping-handler (comp &key to from &allow-other-keys)
(declare (ignore comp to from))
;; This is a stub!
nil)
(defun whatsxmpp-mam-query-handler (comp &key to from stanza &allow-other-keys)
"Handles Message Archive Management (MAM) queries."
(with-component-data-lock (comp)
(let* ((stripped (strip-resource from))
(local-time:*default-timezone* local-time:+utc-zone+)
(uid (or
(get-user-id stripped)
(error 'stanza-error
:defined-condition "registration-required"
:text "You must be a bridge user to run MAM queries."
:type "auth")))
(chat-id (or
(get-user-chat-id uid (nth-value 1 (parse-jid to)))
(error 'stanza-error
:defined-condition "item-not-found"
:text "Couldn't find a WhatsApp chat with that JID."
:type "modify")))
(query-params (alist-from-mam-query (elt (child-elements stanza) 0))))
(format *debug-io* "~&MAM query for ~A from ~A:~% params ~A~%" from to query-params)
(labels ((unix-from-mam (time-input)
(alexandria:when-let ((time time-input))
(local-time:timestamp-to-unix (local-time:parse-timestring time))))
(unix-from-mam-params (keyword params)
(unix-from-mam (whatscl::cassoc keyword params))))
(multiple-value-bind (messages completep)
(query-archive uid chat-id
:start (unix-from-mam-params :start query-params)
:end (unix-from-mam-params :end query-params)
:limit (alexandria:when-let
((limit (whatscl::cassoc :max query-params)))
(parse-integer limit))
:reference-stanza-id (or
(whatscl::cassoc :after query-params)
(whatscl::cassoc :before query-params))
:forward-page (whatscl::cassoc :after query-params))
(format *debug-io* "~&MAM query for ~A returned ~A messages (complete: ~A)" from (length messages) completep)
(loop
for msg in messages
do (deliver-mam-history-message comp msg from (whatscl::cassoc :query-id query-params)))
`((cxml:with-element "fin"
(cxml:attribute "xmlns" ,+mam-ns+)
(cxml:attribute "complete" ,(if completep "true" "false"))
(cxml:with-element "set"
(cxml:attribute "xmlns" ,+rsm-ns+)
,@(when (> (length messages) 0)
`((cxml:with-element "first"
(cxml:text ,(xmpp-id (first messages))))
(cxml:with-element "last"
(cxml:text ,(xmpp-id (car (last messages)))))))))))))))
(defun do-chat-history-request (comp conn jid uid requested-jid) (defun do-chat-history-request (comp conn jid uid requested-jid)
"Retrieves full chat history for the REQUESTED-JID, and inserts it into the database." "Retrieves full chat history for the REQUESTED-JID, and inserts it into the database."
(whatscl::get-full-chat-history (whatscl::get-full-chat-history
@ -523,7 +579,8 @@ Returns three values: avatar data (as two values), and a generalized boolean spe
(wx-localpart (wa-jid-to-whatsxmpp-localpart ct-jid))) (wx-localpart (wa-jid-to-whatsxmpp-localpart ct-jid)))
(when (uiop:string-prefix-p "u" wx-localpart) (when (uiop:string-prefix-p "u" wx-localpart)
;; The user has an open chat with this other user, so they probably want a presence subscription. ;; The user has an open chat with this other user, so they probably want a presence subscription.
(handle-wa-contact-presence-subscriptions comp jid wx-localpart) (when (get-contact-name uid wx-localpart) ;; FIXME
(handle-wa-contact-presence-subscriptions comp jid wx-localpart))
(return-from add-wa-chat)) (return-from add-wa-chat))
(unless (uiop:string-prefix-p "g" wx-localpart) (unless (uiop:string-prefix-p "g" wx-localpart)
(warn "Interesting localpart pased to ADD-WA-CHAT: ~A" wx-localpart) (warn "Interesting localpart pased to ADD-WA-CHAT: ~A" wx-localpart)
@ -601,6 +658,7 @@ Returns three values: avatar data (as two values), and a generalized boolean spe
(let* ((new-from (concatenate 'string orig-to "/" muc-resource)) (let* ((new-from (concatenate 'string orig-to "/" muc-resource))
(group-localpart (nth-value 1 (parse-jid orig-to))) (group-localpart (nth-value 1 (parse-jid orig-to)))
(recipients (get-user-chat-joined (get-user-id jid) group-localpart))) (recipients (get-user-chat-joined (get-user-id jid) group-localpart)))
;; FIXME: You can break the database's UNIQUE constraint here.
(insert-xmpp-message (make-instance 'xmpp-message (insert-xmpp-message (make-instance 'xmpp-message
:conversation group-localpart :conversation group-localpart
:uid (get-user-id jid) :uid (get-user-id jid)
@ -905,7 +963,7 @@ Returns three values: avatar data (as two values), and a generalized boolean spe
(let ((conn (gethash stripped (component-whatsapps comp)))) (let ((conn (gethash stripped (component-whatsapps comp))))
(if conn (if conn
(let ((chats (get-user-groupchats uid))) (let ((chats (get-user-groupchats uid)))
(reply (format nil "Fetching full chat history for ~A groupchats. This will probably take a long time.~%Note that even after completion is reported, some background media uploading may be in progress." (reply (format nil "Fetching full chat history for ~A groupchats. This will probably take a long time.~%Note that even after completion is reported, some background media uploading may be in progress.~%If the WhatsApp connection is interrupted midway through the fetch, you will need to retry the fetch."
(length chats))) (length chats)))
(bt:make-thread (bt:make-thread
(lambda () (lambda ()
@ -1297,7 +1355,9 @@ Returns three values: avatar data (as two values), and a generalized boolean spe
(defun register-whatsxmpp-handlers (comp) (defun register-whatsxmpp-handlers (comp)
(register-component-iq-handler comp :disco-info #'disco-info-handler) (register-component-iq-handler comp :disco-info #'disco-info-handler)
(register-component-iq-handler comp :vcard-temp-get #'whatsxmpp-vcard-temp-handler) (register-component-iq-handler comp :vcard-temp-get #'whatsxmpp-vcard-temp-handler)
(register-component-iq-handler comp :disco-items #'disco-items-handler)) (register-component-iq-handler comp :disco-items #'disco-items-handler)
(register-component-iq-handler comp :mam-query #'whatsxmpp-mam-query-handler)
(register-component-iq-handler comp :ping #'whatsxmpp-ping-handler))
(defun whatsxmpp-init () (defun whatsxmpp-init ()
"Initialise the whatsxmpp bridge." "Initialise the whatsxmpp bridge."

View file

@ -17,6 +17,10 @@
"Returns the child elements (excluding text nodes) of the CXML DOM node NODE." "Returns the child elements (excluding text nodes) of the CXML DOM node NODE."
(remove-if-not #'dom:element-p (dom:child-nodes node))) (remove-if-not #'dom:element-p (dom:child-nodes node)))
(defun nil-empty (seq)
"If SEQ (a sequence) is empty, returns NIL; otherwise, returns SEQ."
(unless (eql (length seq) 0) seq))
(defmacro with-promise-from-thread (() &body forms) (defmacro with-promise-from-thread (() &body forms)
"Return a promise that executes FORMS in a new thread, resolving the promise with the return value of (PROGN ,@FORMS) or rejecting it if an ERROR condition is thrown (with said condition)." "Return a promise that executes FORMS in a new thread, resolving the promise with the return value of (PROGN ,@FORMS) or rejecting it if an ERROR condition is thrown (with said condition)."
(let ((resolve (gensym)) (let ((resolve (gensym))

View file

@ -13,6 +13,7 @@
(:file "xep-0030") (:file "xep-0030")
(:file "xep-0363") (:file "xep-0363")
(:file "xep-0115") (:file "xep-0115")
(:file "xep-0313")
(:file "sqlite") (:file "sqlite")
(:file "db") (:file "db")
(:file "media") (:file "media")

31
xep-0313.lisp Normal file
View file

@ -0,0 +1,31 @@
;;;; XEP-0313: Message Archive Management
(in-package :whatsxmpp)
(defun whitelisted-mam-keywordize (thing)
"Interns THING, but only after making sure it's a string from XEP-0313."
(if (member thing '("start" "end" "with" "first" "last" "count" "max" "FORM_TYPE" "after" "before")
:test #'string=)
(intern (string-upcase thing) :keyword)
thing))
(defun alist-from-mam-query (query-elt)
"Parses the QUERY-ELT, a MAM <query> element, and returns an alist."
(labels ((consify-df (field-elt)
(cons (whitelisted-mam-keywordize
(dom:get-attribute field-elt "var"))
(nil-empty
(get-node-text
(get-node-named (child-elements field-elt) "value")))))
(consify-rsm (rsm-elt)
(cons (whitelisted-mam-keywordize
(dom:node-name rsm-elt))
(nil-empty (get-node-text rsm-elt)))))
(let* ((x-elt (get-node-with-xmlns (child-elements query-elt) +data-forms-ns+))
(rsm-elt (get-node-with-xmlns (child-elements query-elt) +rsm-ns+))
(query-id (dom:get-attribute query-elt "queryid"))
(form-fields (map 'list #'consify-df (child-elements x-elt)))
(rsm-fields (when rsm-elt
(map 'list #'consify-rsm (child-elements rsm-elt)))))
(append form-fields rsm-fields (when query-id
`((:query-id . ,query-id)))))))