126 lines
3.9 KiB
Python
126 lines
3.9 KiB
Python
import logging
|
|
|
|
# xmpp
|
|
import slixmpp
|
|
from slixmpp.exceptions import IqError, IqTimeout
|
|
# files
|
|
import os
|
|
# config
|
|
from dotenv import load_dotenv
|
|
# gpio
|
|
import RPi.GPIO as GPIO
|
|
# sleep
|
|
import time
|
|
# hacking my godamn threads into slixmpp non threadable library
|
|
import asyncio
|
|
# webserver
|
|
from flask import Flask as f
|
|
from flask import Response
|
|
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_UP)
|
|
init = 0
|
|
app = f("web_thread")
|
|
|
|
class EchoBot(slixmpp.ClientXMPP):
|
|
|
|
def __init__(self, jid, password):
|
|
slixmpp.ClientXMPP.__init__(self, jid, password)
|
|
|
|
self.add_event_handler("session_start", self.session_start)
|
|
self.add_event_handler("message", self.message)
|
|
self.add_event_handler("disconnect", self.reconnect)
|
|
self.add_event_handler("connection_failed", self.reconnect)
|
|
loop = asyncio.get_event_loop()
|
|
asyncio.run_coroutine_threadsafe(asyncio.to_thread(app.run,host='0.0.0.0', port=80, debug=False), loop)
|
|
asyncio.run_coroutine_threadsafe(asyncio.to_thread(self.ringLoop), loop)
|
|
|
|
def session_start(self, event):
|
|
self.send_presence()
|
|
self.get_roster()
|
|
|
|
# Most get_*/set_* methods from plugins use Iq stanzas, which
|
|
# can generate IqError and IqTimeout exceptions
|
|
#
|
|
# try:
|
|
# self.get_roster()
|
|
# except IqError as err:
|
|
# logging.error('There was an error getting the roster')
|
|
# logging.error(err.iq['error']['condition'])
|
|
# self.disconnect()
|
|
# except IqTimeout:
|
|
# logging.error('Server is taking too long to respond')
|
|
# self.disconnect()
|
|
|
|
def message(self, msg):
|
|
global init
|
|
if msg['type'] in ('chat', 'normal'):
|
|
print (msg['body'])
|
|
if msg['body'] == "open":
|
|
msg.reply("Buzzing").send()
|
|
if init == 0:
|
|
GPIO.setup(17, GPIO.OUT)
|
|
init = 1
|
|
GPIO.output(17, GPIO.LOW)
|
|
time.sleep(2)
|
|
GPIO.output(17, GPIO.HIGH)
|
|
else:
|
|
msg.reply("Not Understood").send()
|
|
|
|
def reconnect(self):
|
|
while not self.is_connected():
|
|
self.connect()
|
|
time.sleep(5)
|
|
|
|
def ringLoop(self):
|
|
recipient = os.getenv("NOTIFY")
|
|
debug = 0
|
|
while True:
|
|
#debug = debug + 1
|
|
#if debug == 50:
|
|
# print (time.strftime("%d-%m|%H:%M",time.localtime()),"> GPIO OUTPUT: ", GPIO.input(23))
|
|
# debug = 0
|
|
#time.sleep(0.1)
|
|
channel = GPIO.wait_for_edge(23, GPIO.FALLING, timeout=5000)
|
|
if channel is None:
|
|
print (time.strftime("%d-%m|%H:%M",time.localtime()),"> GPIO OUTPUT: 1")
|
|
else:
|
|
#if GPIO.input(23) == 0:
|
|
self.send_message(mto=recipient, mbody="Klingelt!", mtype='chat')
|
|
time.sleep(0.5)
|
|
# self._loop.call_soon_threadsafe(self.send_message(mto=recipient, mbody="Klingelt!", mtype='chat'))
|
|
# works too without modifying the library but throws errors everytime it's called
|
|
print ("ringing")
|
|
print ("ringing")
|
|
print ("ringing")
|
|
print ("ringing")
|
|
print ("ringing")
|
|
|
|
@app.route('/buzz', methods = ['GET'])
|
|
def rbuzz():
|
|
global init
|
|
if init == 0:
|
|
GPIO.setup(17, GPIO.OUT)
|
|
init = 1
|
|
GPIO.output(17, GPIO.LOW)
|
|
time.sleep(2) # dumb - I know - go on
|
|
GPIO.output(17, GPIO.HIGH)
|
|
return "{\"status\":0}"
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return open("index.html", "r").read()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
load_dotenv()
|
|
password = os.getenv("PASS")
|
|
jid = os.getenv("JID")
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(levelname)-8s %(message)s')
|
|
xmpp = EchoBot(jid, password)
|
|
|
|
xmpp.connect()
|
|
xmpp.process(forever=True)
|