PIRgate now logs immediately for better visualization in Grafana

Gatecounter service added
Gatecounter db now initialized by the gatecounter itself courtesy of SQLAlchemy
This commit is contained in:
Brennen Raimer
2019-11-03 13:49:46 -05:00
parent c2fed99704
commit fa48b3357e
4 changed files with 147 additions and 131 deletions

View File

@@ -1,89 +1,123 @@
# Written By Johnathan Cintron and Devlyn Courtier for the HCCC Library
#!/usr/bin/python
#!/usr/bin/python3
import collections
import logging
import subprocess
import sys
import MySQLdb
from time import sleep
from argparse import ArgumentParser
from concurrent.futures import ThreadPoolExecutor, CancelledError, wait
from datetime import datetime
from queue import Queue
import RPi.GPIO as GPIO
from getpass import getpass
from multiprocessing import Queue
from concurrent.futures import ProcessPoolExecutor, CancelledError, wait
from sqlalchemy import Column, DateTime, Integer, Table, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
log = logging.getLogger("sqlalchemy")
info = logging.StreamHandler(sys.stdout)
info.addFilter(lambda x: x.levelno <= logging.WARNING)
errors = logging.StreamHandler(sys.stderr)
errors.addFilter(lambda x: x.levelno >= logging.ERROR)
log.setLevel(logging.DEBUG)
log.addHandler(info)
log.addHandler(errors)
Base = declarative_base()
class PIR_Detection(Base):
__tablename__ = "PIRSTATS"
time = Column('datetime', DateTime, nullable=False, primary_key=True)
count = Column('count', Integer, nullable=False)
Detection=collections.namedtuple("Detection", ['time','count'])
class PIRgate:
def __init__(self, hostname, username, password, database):
# Set RPi GPIO Mode
GPIO.setmode(GPIO.BCM)
def __init__(self, hostname, username, password, database):
# Set RPi GPIO Mode
GPIO.setmode(GPIO.BCM)
# Setup GPIO in and out pins
self.PIR_PIN = 7
GPIO.setup(self.PIR_PIN, GPIO.IN)
# End GPIO setup
self.counts=Queue()
self._pool=ProcessPoolExecutor()
# Setup GPIO in and out pins
self.PIR_PIN = 7
GPIO.setup(self.PIR_PIN, GPIO.IN)
# End GPIO setup
self._pool=ThreadPoolExecutor()
self._detection_queue=Queue()
if not hostname:
stdout,stderr = subprocess.Popen(['docker',
'inspect',
'-f',
"'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'",
'gatecounter-db'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT).communicate()
host = stdout.decode().strip()
else:
host = hostname
self.db_engine = create_engine(f"mysql://{username}:{password}@{host}/{database}")
Base.metadata.create_all(bind=self.db_engine)
self._session_factory = sessionmaker(bind=self.db_engine)
self.Session = scoped_session(self._session_factory)
def start(self):
try:
self.event_listener = self._pool.submit(self.listen_for_events)
self.db_writer = self._pool.submit(self.write_to_db, hostname, username, password, database)
except KeyboardInterrupt:
print("\nCtrl-C pressed cleaning up GPIO")
self.event_listener.cancel()
self.db_writer.cancel()
GPIO.cleanup()
finally:
wait([self.event_listener,self.db_writer])
GPIO.cleanup()
def start(self):
GPIO.add_event_detect(self.PIR_PIN, GPIO.RISING, callback=lambda c: self._detection_queue.put(Detection(datetime.now(),1)))
with self._pool:
try:
self.db_writer = self._pool.submit(self.write_to_db)
wait([self.db_writer])
except KeyboardInterrupt:
print("\nCtrl-C pressed cleaning up GPIO")
raise
finally:
GPIO.cleanup()
def listen_for_events(self):
count = 0
while True:
try:
if GPIO.input(self.PIR_PIN):
count += 1
curr_date = datetime.now()
if (curr_date.minute % 10 == 0) and (curr_date.second == 0):
self.counts.put_nowait((curr_date,count))
count = 0
except (KeyboardInterrupt,CancelledError):
break
def write_to_db(self, hostname, username, password, database):
while True:
try:
time, count = self.counts.get()
with MySQLdb.connect(hostname,username,password,database) as db:
try:
db.cursor().execute("INSERT INTO PIRSTATS (datetime, gatecount) VALUES ('%s', '%d')" % (time.isoformat(' '), count))
except (KeyboardInterrupt,CancelledError):
db.rollback()
break
except:
db.rollback()
self.counts.put(time,count) #put the data back in queue to try writing it again
else:
db.commit()
finally:
db.close()
except (KeyboardInterrupt,CancelledError):
break
def write_to_db(self):
while True:
try:
detection = self._detection_queue.get()
session = self.Session()
session.add(PIR_Detection(time=detection.datetime, count=detection.count))
except KeyboardInterrupt:
session.rollback()
raise
except:
session.rollback()
self._detection_queue.put(detection) #return detection to queue to try again
else:
session.commit()
finally:
session.remove()
if __name__ == "__main__":
while True:
try:
hostname = input("DB Hostname: ")
database = input("Database: ")
username = input("Username: ")
password = getpass()
#just check the credentials by connecting to the db and closing
MySQLdb.connect(hostname, username, password, database).close()
except KeyboardInterrupt:
sys.exit(1)
except:
print("\nProblem connecting to the database. Check your credentials and try again \n")
continue
else:
break
PIRgate(hostname, username, password, database).start()
parser = ArgumentParser(description="Begin PIR detections")
parser.add_argument("-H", "--hostname",default="")
parser.add_argument("-d", "--database",default="gatecounter")
parser.add_argument("-u", "--username", required=True)
parser.add_argument("-p", "--password", required=True)
if sys.argv[0].startswith("python"):
args = parser.parse_args(sys.argv[2:])
else:
args = parser.parse_args(sys.argv[1:])
if not args.hostname:
stdout,stderr = subprocess.Popen(['docker',
'inspect',
'-f',
"'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'",
'gatecounter-db'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT).communicate()
host = stdout.decode().strip()
else:
host = args.hostname
try:
PIRgate(args.hostname, args.username, args.password, args.database).start()
except KeyboardInterrupt:
sys.exit(1)
except:
print("\nProblem connecting to the database. Check your credentials and try again \n")
sys.exit(2)