Python code check

I have updated code for everyone to amuse themselves with :slight_smile:

I switched to asyncpg for the postgresql connection library. Looked like it was easier to use with async/await. The code still has plenty of print statements and is probably my worst copypasta from various online sources in my recent history. I’m out of my depth here :sweat_smile: But, it works! I’m getting data into my database. I’m sure there’s lots of improvements that could be made. I’m doing very little for exception handling, and I don’t have a good idea of what happens if the db closes the connection (haven’t tried experimenting with that yet).

import datetime
import asyncio
import asyncpg

class UDPServerProtocol:
	def __init__(self, entryQueue):
		self.entryQueue = entryQueue
	def connection_made(self, transport):
		self.transport = transport
	def datagram_received(self, data, addr):
		message = data.decode("utf-8")
		address = addr[0]
		timestamp = datetime.datetime.now()
		#print(address)
		#print(timestamp)
		#print(message)
		task = asyncio.create_task(producer(self.entryQueue, address, timestamp, message))
	def connection_lost(self, transport):
		print("Connection lost!")

async def get_asyncpg_connection_pool():
	db_conf = {
		"user": "REDACTED",
		"password": "EXTRA REDACTED",
		"database": "REDACTED",
		"host": "REDACTED",
		"min_size":2,
		"max_size":4
	}
	connection = await asyncpg.create_pool(**db_conf)
	return connection

async def producer(entryQueue,address,timestamp,message):
	print(address)
	print(timestamp)
	print(message)
	entry = {"address":address, "timestamp":timestamp, "message":message}
	await entryQueue.put(entry)

async def qMonitor(entryQueue):
	while True:
		print("Queue size is ", entryQueue.qsize())
		await asyncio.sleep(15)

async def consumer(entryQueue, pool):
	print("Starting consumer")
	# Take a connection from the pool.
	print("getting db connection")
	async with pool.acquire() as connection:
		while True:
			#get an entry from the queue, wait until one is available
			print("waiting for queue entry")
			entry = await entryQueue.get()
			print("Processing entry!")
			#process entry.  Using a try block so we can return the entry to the queue if the processing fails
			try:
				#just printing for now.  Add code to push to database here
				address = entry.get("address")
				timestamp = entry.get("timestamp")
				message = entry.get("message")
				print(timestamp, address, message)
				dbquery = "INSERT INTO dmlogger (address, timestamp, message) VALUES($1, $2, $3);"
				await connection.execute(dbquery, address, timestamp, message)
			except RuntimeError as e:
				#if there was some exception, put the entry back into the queue
				#may need to check the type of exception, if it is possible an exception can be raised while still getting a database entry
				print(f"consumer exception {e}")
				await entryQueue.put(entry)
			except:
				print("unhandled exception")
				await entryQueue.put(entry)
			finally:
				entryQueue.task_done()

async def main():
	print("Starting UDP server")
	# Get a reference to the event loop as we plan to use
	# low-level APIs.
	loop = asyncio.get_running_loop()
	#start a queue
	entryQueue = asyncio.Queue(maxsize=1000)
	#watch the current queue
	#watchQ = asyncio.create_task(qMonitor(entryQueue))
	watchers = [asyncio.create_task(qMonitor(entryQueue)) for i in range(1)]
	print("opening db connection pool")
	pool = await get_asyncpg_connection_pool()
	#starting one consumer for now.  Later, might expand to 2-3 consumers
	consumers = [asyncio.create_task(consumer(entryQueue, pool)) for i in range(2)]
	# One protocol instance will be created to serve all
	# client requests.
	transport, protocol = await loop.create_datagram_endpoint(lambda: UDPServerProtocol(entryQueue),local_addr=('0.0.0.0', 29298))
	try:
		await asyncio.sleep(300)  # Serve for 5 minutes.
	finally:
		transport.close()

asyncio.run(main())

Time to go home and eat supper :slight_smile: