import threading
import kritter
import vizy
import cv2
from pupil_apriltags import Detector
import numpy as np
import socket
import queue
from datetime import datetime
class AprilDetector:
def __init__(self):
# Set up Vizy class, Camera, etc.
self.kapp = vizy.Vizy()
self.camera = kritter.Camera(hflip=True, vflip=True)
self.stream = self.camera.stream()
# Put video window in the layout
self.video = kritter.Kvideo(width=1000, overlay=True)
brightness = kritter.Kslider(name="Brightness", value=self.camera.brightness, mxs=(0,100,1), format=lambda val: '{}%' .format(val), grid=False)
self.kapp.layout = [self.video, brightness]
@brightness.callback()
def func(value):
self.camera.brightness = value
# Define detector
self.at_detector = Detector(
families="tag36h11",
nthreads=1,
quad_decimate=1,
refine_edges=1,
decode_sharpening=0.25,
debug=0
)
# Define queue for socket streaming
self.data_queue = queue.Queue(1)
# Run camera grab thread.
self.run_grab = True
videoThread = threading.Thread(target=self.grab).start()
socketThread = threading.Thread(target=self.listen_for_connections).start()
# Run Vizy webserver, which blocks.
self.kapp.run()
self.run_grab = False
videoThread.join()
socketThread.join()
def handle_connection(self, client_socket, client_address):
#try:
while True:
if not self.data_queue.empty():
dataFrame = self.data_queue.get()
client_socket.sendall(dataFrame.encode());
#except EOFError:
# The client disconnected, close the socket and exit the thread
##client_socket.close()
#print(f"Client {client_address} disconnected")
def listen_for_connections(self):
# Create a server socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('XXX.XXX.XXX.XXX', 8000)
server_socket.bind(server_address)
# Listen for incoming connections
print(f"Listening for incoming connections on {server_address}")
server_socket.listen()
# Handle incoming connections in a loop
while True:
# Wait for a client connection
client_socket, client_address = server_socket.accept()
print(f"New connection from {client_address}")
# Start a new thread to handle the connection
client_thread = threading.Thread(target=self.handle_connection, args=(client_socket, client_address))
client_thread.start()
def processImage(self, results, image):
tagInstances = []
for r in results:
# extract the bounding box (x, y)-coordinates for the AprilTag
# and convert each of the (x, y)-coordinate pairs to integers
(ptA, ptB, ptC, ptD) = r.corners
ptB = (int(ptB[0]), int(ptB[1]))
ptC = (int(ptC[0]), int(ptC[1]))
ptD = (int(ptD[0]), int(ptD[1]))
ptA = (int(ptA[0]), int(ptA[1]))
# draw the bounding box of the AprilTag detection
cv2.line(image, ptA, ptB, (255, 0, 0), 2)
cv2.line(image, ptB, ptC, (255, 0, 0), 2)
cv2.line(image, ptC, ptD, (0, 255, 0), 2)
cv2.line(image, ptD, ptA, (0, 0, 255), 2)
# draw the center (x, y)-coordinates of the AprilTag
(cX, cY) = (int(r.center[0]), int(r.center[1]))
cv2.circle(image, (cX, cY), 5, (0, 255, 255), -1)
# draw the tag family on the image
tagFamily = r.tag_family.decode("utf-8")
tagId = r.tag_id
cv2.putText(image, str(tagId), (ptA[0], ptA[1] - 15),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
current_time = datetime.now()
# Prepares data packet
tagInstance = {"ID":r.tag_id,
"Confidence":r.decision_margin,
"CenterX":r.center[0],
"CenterY":r.center[1], "Time":current_time
}
tagInstances.append(tagInstance)
if not self.data_queue.full():
packet = str(tagInstances) + "\n"
self.data_queue.put(packet)
if len(tagInstances)>0:
print(tagInstances)
else:
pass
def grab(self):
while self.run_grab:
# Get frame
frame = self.stream.frame()[0]
#Process frame
grayImage = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
detections = self.at_detector.detect(grayImage)
self.processImage(detections, frame)
# Send frame
self.video.push_frame(frame)
if __name__ == "__main__":
AprilDetector()