-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
66 lines (47 loc) · 1.55 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Local modules -- Need to be careful of name space using import *
from modules.lanevision.lanevision import *
from modules.generator.generator import *
from modules.detector.detector import *
from modules.vehicle.vehicle import *
# Remote and standard modules
import threading
import pygame
import time
import sys
# Import voice files
pygame.init()
imkatara = pygame.mixer.Sound('assets/audio/voices/katara/imkatara.mp3')
# Set up objects
vehicle = Vehicle()
alertGenerator = Generator("database/alerts/alerts.db")
alertDetector = Detector(vehicle, alertGenerator)
# THREADS
def processGeneratedAlerts():
""" Docstring """
while True:
alertGenerator.processAlert()
def collectVehicleData():
""" Docstring """
while True:
vehicle.collectVehicleData()
def runAlertDetector():
while True:
alertDetector.run()
# We can pass threaded functions the generator object so that it can add alerts to the queue when needed
if __name__ == "__main__":
event = threading.Event()
event.clear()
threads = []
threads.append(threading.Thread(target=processGeneratedAlerts))
threads.append(threading.Thread(target=collectVehicleData))
threads.append(threading.Thread(target=runAlertDetector))
# Begin all threads
for thread in threads:
print("Starting: " + thread.name)
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
print("Stopped: " + thread.name)
print("Exiting program gracefully")
# print(alertGenerator.queuedAlerts)