Actually reconnect users & connect on bridge start

This commit is contained in:
eta 2020-04-04 17:18:26 +01:00
parent fb8f405adc
commit e2add1d98f
3 changed files with 65 additions and 27 deletions

View file

@ -68,6 +68,14 @@ In other words, prepares STATEMENT once, then returns the prepared statement aft
for ,i-sym from 0 below (length (sqlite:statement-column-names ,stmt)) for ,i-sym from 0 below (length (sqlite:statement-column-names ,stmt))
collect (sqlite:statement-column-value ,stmt ,i-sym))))) collect (sqlite:statement-column-value ,stmt ,i-sym)))))
(defmacro with-bound-columns (parameters statement &body forms)
"Binds each column value of STATEMENT to the symbols in PARAMETERS, and runs FORMS."
(let ((let-forms (loop
for param in parameters
for idx from 0 upto (1- (length parameters))
collect `(,param (sqlite:statement-column-value ,statement ,idx)))))
`(let (,@let-forms) ,@forms)))
(defmacro bind-parameters (statement &rest parameters) (defmacro bind-parameters (statement &rest parameters)
"Binds PARAMETERS to the prepared statement STATEMENT. "Binds PARAMETERS to the prepared statement STATEMENT.

View file

@ -46,6 +46,9 @@
((whatsapps ((whatsapps
:initform (make-hash-table :test 'equal) :initform (make-hash-table :test 'equal)
:accessor component-whatsapps) :accessor component-whatsapps)
(reconnect-timer
:initform nil
:accessor component-reconnect-timer)
(upload-component-name (upload-component-name
:initarg :upload-component-name :initarg :upload-component-name
:accessor component-upload-component-name))) :accessor component-upload-component-name)))
@ -152,23 +155,30 @@
(defun component-listen-thread (comp) (defun component-listen-thread (comp)
"Listening thread for an XMPP component: constantly reads from the socket and emits new stanzas." "Listening thread for an XMPP component: constantly reads from the socket and emits new stanzas."
(format *debug-io* "Starting component listening thread~%") (format *debug-io* "Starting component listening thread~%")
;; ### Story time! ### ;; ### Story time! ###
;; So I spent an hour debugging why this wasn't working. ;; So I spent an hour debugging why this wasn't working.
;; And, long story short, if you just call CXML:PARSE with a stream ;; And, long story short, if you just call CXML:PARSE with a stream
;; it gets converted into an 'xstream' inside CXML, which has a :SPEED ;; it gets converted into an 'xstream' inside CXML, which has a :SPEED
;; property. This :SPEED property controls how many bytes it tries to buffer ;; property. This :SPEED property controls how many bytes it tries to buffer
;; before actually doing the parsing and the goddamn default is 8192 (!!). ;; before actually doing the parsing and the goddamn default is 8192 (!!).
;; This obviously ain't gonna fly for our TCP socket, because the initial stream ;; This obviously ain't gonna fly for our TCP socket, because the initial stream
;; start element is less than 8192 bytes. So we make our own stupid xstream ;; start element is less than 8192 bytes. So we make our own stupid xstream
;; and specify the speed manually, and then it works. ;; and specify the speed manually, and then it works.
;; ;;
;; Wouldn't it be nice if people documented this sort of thing? ;; Wouldn't it be nice if people documented this sort of thing?
(let ((source (make-xmpp-source comp)) (let ((source (make-xmpp-source comp))
(fucking-stream (cxml:make-xstream (component-socket comp) (fucking-stream (cxml:make-xstream (component-socket comp)
:speed 1 ; FFFFFFFFUUUUUUUU :speed 1 ; FFFFFFFFUUUUUUUU
:initial-speed 1))) :initial-speed 1)))
(handler-case
(cxml:parse fucking-stream source (cxml:parse fucking-stream source
:recode t))) :recode t)
(error (e)
(with-simple-restart
(continue "Continue execution.")
(invoke-debugger e))
(format *debug-io* "~&Component listen thread failed: ~A~%" e)
(emit :error comp e)))))
(defmacro with-component-xml-output ((comp) &body body) (defmacro with-component-xml-output ((comp) &body body)
(let ((ret-sym (gensym))) (let ((ret-sym (gensym)))
@ -390,8 +400,8 @@
:text text)))) :text text))))
(defun handle-connection-complete (comp) (defun handle-connection-complete (comp)
(declare (ignore comp)) (format *debug-io* "Connection complete! \o/")
(format *debug-io* "Connection complete! \o/")) (emit :connected comp))
(defun send-stanza-error (comp &key id to from e stanza-type) (defun send-stanza-error (comp &key id to from e stanza-type)
"Send E (a STANZA-ERROR) as an error response to a stanza of type STANZA." "Send E (a STANZA-ERROR) as an error response to a stanza of type STANZA."
@ -633,6 +643,9 @@ Commands:
- status: get your current status - status: get your current status
- help: view this help text") - help: view this help text")
(defparameter *reconnect-every-secs* 5
"Interval between calls to WA-RESETUP-USERS.")
(defun admin-msg (comp jid text) (defun admin-msg (comp jid text)
"Send an admin message from the admin on COMP to JID." "Send an admin message from the admin on COMP to JID."
(send-text-message comp jid text (admin-jid comp))) (send-text-message comp jid text (admin-jid comp)))
@ -640,16 +653,19 @@ Commands:
(defun wa-resetup-users (comp) (defun wa-resetup-users (comp)
"Go through the list of WhatsApp users and reconnect those whose connections have dropped." "Go through the list of WhatsApp users and reconnect those whose connections have dropped."
(with-component-data-lock (comp) (with-component-data-lock (comp)
(let ((users-to-reconnect (let* ((users-to-reconnect
(loop (loop
for jid being the hash-keys in (component-whatsapps comp) for jid being the hash-keys in (component-whatsapps comp)
using (hash-value conn) using (hash-value conn)
append (unless conn append (unless conn
(list jid))))) (list jid))))
(format nil "~&resetup-users: ~A users to reconnect~%" (length users-to-reconnect)) (num-users (length users-to-reconnect)))
(when (> num-users 0)
(format *debug-io* "~&resetup-users: ~A users to reconnect~%" num-users))
(loop (loop
for user in users-to-reconnect for user in users-to-reconnect
do (handle-setup-user comp user))))) do (handle-setup-user comp user))
(trivial-timers:schedule-timer (component-reconnect-timer comp) *reconnect-every-secs*))))
(defun send-qrcode (comp jid text) (defun send-qrcode (comp jid text)
"Send a QR code containing TEXT to JID." "Send a QR code containing TEXT to JID."
@ -963,6 +979,15 @@ WhatsXMPP represents users as u440123456789 and groups as g1234-5678."
(t (t
(whatscl::send-simple-text-message conn wa-jid body)))))))))) (whatscl::send-simple-text-message conn wa-jid body))))))))))
(defun whatsxmpp-load-users (comp)
(with-component-data-lock (comp)
(with-prepared-statement
(stmt "SELECT jid FROM users;")
(loop
while (sqlite:step-statement stmt)
do (with-bound-columns (jid) stmt
(setf (gethash jid (component-whatsapps comp)) nil))))))
(defun whatsxmpp-init () (defun whatsxmpp-init ()
"Initialise the whatsxmpp bridge." "Initialise the whatsxmpp bridge."
(connect-database) (connect-database)
@ -977,4 +1002,9 @@ WhatsXMPP represents users as u440123456789 and groups as g1234-5678."
(on :text-message ret (lambda (&rest args) (on :text-message ret (lambda (&rest args)
(apply #'whatsxmpp-message-handler ret args))) (apply #'whatsxmpp-message-handler ret args)))
(register-whatsxmpp-handlers ret) (register-whatsxmpp-handlers ret)
(whatsxmpp-load-users ret)
(setf (component-reconnect-timer ret) (trivial-timers:make-timer
(lambda () (wa-resetup-users ret))
:name "reconnection timer"))
(on :connected ret (lambda () (wa-resetup-users ret)))
ret)))) ret))))

View file

@ -1,5 +1,5 @@
(defsystem "whatsxmpp" (defsystem "whatsxmpp"
:depends-on ("usocket" "bordeaux-threads" "event-emitter" "blackbird" "cxml" "ironclad" "uuid" "sqlite" "whatscl" "drakma") :depends-on ("usocket" "bordeaux-threads" "event-emitter" "blackbird" "cxml" "ironclad" "uuid" "sqlite" "whatscl" "drakma" "local-time" "trivial-timers")
:serial t :serial t
:components :components
((:file "packages") ((:file "packages")