hausbrand/waxmpp.py

108 lines
3.3 KiB
Python

import logging
# xmpp
import slixmpp
from slixmpp.exceptions import IqError, IqTimeout
# files
import os
# config
from dotenv import load_dotenv
# gpio
import RPi.GPIO as GPIO
# sleep
import time
# hacking my godamn threads into slixmpp non threadable library
import asyncio
# webserver
from flask import Flask as f
from flask import Response
GPIO.setmode(GPIO.BCM)
GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_UP)
init = 0
app = f("web_thread")
class EchoBot(slixmpp.ClientXMPP):
def __init__(self, jid, password):
slixmpp.ClientXMPP.__init__(self, jid, password)
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.add_event_handler("disconnected", self.my_reconnect)
#self.add_event_handler("connection_failed", self._reconnect)
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(asyncio.to_thread(app.run,host='0.0.0.0', port=80, debug=False), loop)
asyncio.run_coroutine_threadsafe(asyncio.to_thread(self.ringLoop), loop)
def session_start(self, event):
self.send_presence()
self.get_roster()
def message(self, msg):
global init
if msg['type'] in ('chat', 'normal'):
print (msg['body'])
if msg['body'] == "open":
msg.reply("Buzzing").send()
if init == 0:
GPIO.setup(17, GPIO.OUT)
init = 1
GPIO.output(17, GPIO.LOW)
time.sleep(2)
GPIO.output(17, GPIO.HIGH)
else:
msg.reply("Not Understood").send()
async def my_reconnect(self, ev):
await asyncio.sleep(5)
while not self.is_connected():
self.connect()
await asyncio.sleep(20)
self.send_message(mto=recipient, mbody="reconnected!", mtype='chat')
def ringLoop(self):
channel = 0
nya = 0
while True:
channel = GPIO.wait_for_edge(23, GPIO.FALLING, timeout=5000)
nya = time.time_ns()
if channel is None:
print (time.strftime("%d-%m|%H:%M",time.localtime()),"> GPIO OUTPUT: 1")
else:
channel = GPIO.wait_for_edge(23, GPIO.BOTH, timeout=5000)
msg = "Ringing for " + str((time.time_ns() - nya)/1000000) + " ms"
xmpp.send_message(mto=recipient, mbody=msg, mtype='chat')
print (time.strftime("%d-%m|%H:%M",time.localtime()),"> Ringing!")
@app.route('/buzz', methods = ['GET'])
def rbuzz():
global init
if init == 0:
GPIO.setup(17, GPIO.OUT)
init = 1
GPIO.output(17, GPIO.LOW)
time.sleep(2) # dumb - I know - go on
GPIO.output(17, GPIO.HIGH)
xmpp.send_message(mto=recipient, mbody="opened via web interface", mtype='chat')
return "{\"status\":0}"
@app.route('/')
def index():
return open("index.html", "r").read()
if __name__ == '__main__':
load_dotenv()
password = os.getenv("PASS")
jid = os.getenv("JID")
recipient = os.getenv("NOTIFY")
logging.basicConfig(level=logging.DEBUG, format='%(levelname)-8s %(message)s')
xmpp = EchoBot(jid, password)
xmpp.connect()
xmpp.process(forever=True)