updated Traefik config to latest
improvements to PIRdbWriteGate.py gatecounter db initialized by python script updated .env.template python scripts now run by gatecounter service gatecounter service build from Dockerfile
This commit is contained in:
@@ -1,89 +1,123 @@
|
||||
# Written By Johnathan Cintron and Devlyn Courtier for the HCCC Library
|
||||
|
||||
#!/usr/bin/python
|
||||
|
||||
#!/usr/bin/python3
|
||||
import collections
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
import MySQLdb
|
||||
from time import sleep
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from concurrent.futures import ThreadPoolExecutor, CancelledError, wait
|
||||
from datetime import datetime
|
||||
from queue import Queue
|
||||
|
||||
import RPi.GPIO as GPIO
|
||||
from getpass import getpass
|
||||
from multiprocessing import Queue
|
||||
from concurrent.futures import ProcessPoolExecutor, CancelledError, wait
|
||||
|
||||
from sqlalchemy import Column, DateTime, Integer, Table, create_engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||
|
||||
log = logging.getLogger("sqlalchemy")
|
||||
info = logging.StreamHandler(sys.stdout)
|
||||
info.addFilter(lambda x: x.levelno <= logging.WARNING)
|
||||
errors = logging.StreamHandler(sys.stderr)
|
||||
errors.addFilter(lambda x: x.levelno >= logging.ERROR)
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.addHandler(info)
|
||||
log.addHandler(errors)
|
||||
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class PIR_Detection(Base):
|
||||
__tablename__ = "PIRSTATS"
|
||||
|
||||
time = Column('datetime', DateTime, nullable=False, primary_key=True)
|
||||
count = Column('count', Integer, nullable=False)
|
||||
|
||||
|
||||
Detection=collections.namedtuple("Detection", ['time','count'])
|
||||
|
||||
class PIRgate:
|
||||
def __init__(self, hostname, username, password, database):
|
||||
# Set RPi GPIO Mode
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
def __init__(self, hostname, username, password, database):
|
||||
# Set RPi GPIO Mode
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
|
||||
# Setup GPIO in and out pins
|
||||
self.PIR_PIN = 7
|
||||
GPIO.setup(self.PIR_PIN, GPIO.IN)
|
||||
# End GPIO setup
|
||||
self.counts=Queue()
|
||||
self._pool=ProcessPoolExecutor()
|
||||
# Setup GPIO in and out pins
|
||||
self.PIR_PIN = 7
|
||||
GPIO.setup(self.PIR_PIN, GPIO.IN)
|
||||
# End GPIO setup
|
||||
self._pool=ThreadPoolExecutor()
|
||||
self._detection_queue=Queue()
|
||||
if not hostname:
|
||||
stdout,stderr = subprocess.Popen(['docker',
|
||||
'inspect',
|
||||
'-f',
|
||||
"'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'",
|
||||
'gatecounter-db'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT).communicate()
|
||||
host = stdout.decode().strip()
|
||||
else:
|
||||
host = hostname
|
||||
self.db_engine = create_engine(f"mysql://{username}:{password}@{host}/{database}")
|
||||
Base.metadata.create_all(bind=self.db_engine)
|
||||
self._session_factory = sessionmaker(bind=self.db_engine)
|
||||
self.Session = scoped_session(self._session_factory)
|
||||
|
||||
def start(self):
|
||||
try:
|
||||
self.event_listener = self._pool.submit(self.listen_for_events)
|
||||
self.db_writer = self._pool.submit(self.write_to_db, hostname, username, password, database)
|
||||
except KeyboardInterrupt:
|
||||
print("\nCtrl-C pressed cleaning up GPIO")
|
||||
self.event_listener.cancel()
|
||||
self.db_writer.cancel()
|
||||
GPIO.cleanup()
|
||||
finally:
|
||||
wait([self.event_listener,self.db_writer])
|
||||
GPIO.cleanup()
|
||||
def start(self):
|
||||
GPIO.add_event_detect(self.PIR_PIN, GPIO.RISING, callback=lambda c: self._detection_queue.put(Detection(datetime.now(),1)))
|
||||
with self._pool:
|
||||
try:
|
||||
self.db_writer = self._pool.submit(self.write_to_db)
|
||||
wait([self.db_writer])
|
||||
except KeyboardInterrupt:
|
||||
print("\nCtrl-C pressed cleaning up GPIO")
|
||||
raise
|
||||
finally:
|
||||
GPIO.cleanup()
|
||||
|
||||
def listen_for_events(self):
|
||||
count = 0
|
||||
while True:
|
||||
try:
|
||||
if GPIO.input(self.PIR_PIN):
|
||||
count += 1
|
||||
curr_date = datetime.now()
|
||||
if (curr_date.minute % 10 == 0) and (curr_date.second == 0):
|
||||
self.counts.put_nowait((curr_date,count))
|
||||
count = 0
|
||||
except (KeyboardInterrupt,CancelledError):
|
||||
break
|
||||
|
||||
|
||||
def write_to_db(self, hostname, username, password, database):
|
||||
while True:
|
||||
try:
|
||||
time, count = self.counts.get()
|
||||
with MySQLdb.connect(hostname,username,password,database) as db:
|
||||
try:
|
||||
db.cursor().execute("INSERT INTO PIRSTATS (datetime, gatecount) VALUES ('%s', '%d')" % (time.isoformat(' '), count))
|
||||
except (KeyboardInterrupt,CancelledError):
|
||||
db.rollback()
|
||||
break
|
||||
except:
|
||||
db.rollback()
|
||||
self.counts.put(time,count) #put the data back in queue to try writing it again
|
||||
else:
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
except (KeyboardInterrupt,CancelledError):
|
||||
break
|
||||
def write_to_db(self):
|
||||
while True:
|
||||
try:
|
||||
detection = self._detection_queue.get()
|
||||
session = self.Session()
|
||||
session.add(PIR_Detection(time=detection.datetime, count=detection.count))
|
||||
except KeyboardInterrupt:
|
||||
session.rollback()
|
||||
raise
|
||||
except:
|
||||
session.rollback()
|
||||
self._detection_queue.put(detection) #return detection to queue to try again
|
||||
else:
|
||||
session.commit()
|
||||
finally:
|
||||
session.remove()
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
try:
|
||||
hostname = input("DB Hostname: ")
|
||||
database = input("Database: ")
|
||||
username = input("Username: ")
|
||||
password = getpass()
|
||||
#just check the credentials by connecting to the db and closing
|
||||
MySQLdb.connect(hostname, username, password, database).close()
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(1)
|
||||
except:
|
||||
print("\nProblem connecting to the database. Check your credentials and try again \n")
|
||||
continue
|
||||
else:
|
||||
break
|
||||
PIRgate(hostname, username, password, database).start()
|
||||
parser = ArgumentParser(description="Begin PIR detections")
|
||||
parser.add_argument("-H", "--hostname",default="")
|
||||
parser.add_argument("-d", "--database",default="gatecounter")
|
||||
parser.add_argument("-u", "--username", required=True)
|
||||
parser.add_argument("-p", "--password", required=True)
|
||||
if sys.argv[0].startswith("python"):
|
||||
args = parser.parse_args(sys.argv[2:])
|
||||
else:
|
||||
args = parser.parse_args(sys.argv[1:])
|
||||
if not args.hostname:
|
||||
stdout,stderr = subprocess.Popen(['docker',
|
||||
'inspect',
|
||||
'-f',
|
||||
"'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'",
|
||||
'gatecounter-db'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT).communicate()
|
||||
host = stdout.decode().strip()
|
||||
else:
|
||||
host = args.hostname
|
||||
try:
|
||||
PIRgate(args.hostname, args.username, args.password, args.database).start()
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(1)
|
||||
except:
|
||||
print("\nProblem connecting to the database. Check your credentials and try again \n")
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user