I have updated code for everyone to amuse themselves with
I switched to asyncpg for the postgresql connection library. Looked like it was easier to use with async/await. The code still has plenty of print statements and is probably my worst copypasta from various online sources in my recent history. I’m out of my depth here But, it works! I’m getting data into my database. I’m sure there’s lots of improvements that could be made. I’m doing very little for exception handling, and I don’t have a good idea of what happens if the db closes the connection (haven’t tried experimenting with that yet).
import datetime
import asyncio
import asyncpg
class UDPServerProtocol:
def __init__(self, entryQueue):
self.entryQueue = entryQueue
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode("utf-8")
address = addr[0]
timestamp = datetime.datetime.now()
#print(address)
#print(timestamp)
#print(message)
task = asyncio.create_task(producer(self.entryQueue, address, timestamp, message))
def connection_lost(self, transport):
print("Connection lost!")
async def get_asyncpg_connection_pool():
db_conf = {
"user": "REDACTED",
"password": "EXTRA REDACTED",
"database": "REDACTED",
"host": "REDACTED",
"min_size":2,
"max_size":4
}
connection = await asyncpg.create_pool(**db_conf)
return connection
async def producer(entryQueue,address,timestamp,message):
print(address)
print(timestamp)
print(message)
entry = {"address":address, "timestamp":timestamp, "message":message}
await entryQueue.put(entry)
async def qMonitor(entryQueue):
while True:
print("Queue size is ", entryQueue.qsize())
await asyncio.sleep(15)
async def consumer(entryQueue, pool):
print("Starting consumer")
# Take a connection from the pool.
print("getting db connection")
async with pool.acquire() as connection:
while True:
#get an entry from the queue, wait until one is available
print("waiting for queue entry")
entry = await entryQueue.get()
print("Processing entry!")
#process entry. Using a try block so we can return the entry to the queue if the processing fails
try:
#just printing for now. Add code to push to database here
address = entry.get("address")
timestamp = entry.get("timestamp")
message = entry.get("message")
print(timestamp, address, message)
dbquery = "INSERT INTO dmlogger (address, timestamp, message) VALUES($1, $2, $3);"
await connection.execute(dbquery, address, timestamp, message)
except RuntimeError as e:
#if there was some exception, put the entry back into the queue
#may need to check the type of exception, if it is possible an exception can be raised while still getting a database entry
print(f"consumer exception {e}")
await entryQueue.put(entry)
except:
print("unhandled exception")
await entryQueue.put(entry)
finally:
entryQueue.task_done()
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
#start a queue
entryQueue = asyncio.Queue(maxsize=1000)
#watch the current queue
#watchQ = asyncio.create_task(qMonitor(entryQueue))
watchers = [asyncio.create_task(qMonitor(entryQueue)) for i in range(1)]
print("opening db connection pool")
pool = await get_asyncpg_connection_pool()
#starting one consumer for now. Later, might expand to 2-3 consumers
consumers = [asyncio.create_task(consumer(entryQueue, pool)) for i in range(2)]
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(lambda: UDPServerProtocol(entryQueue),local_addr=('0.0.0.0', 29298))
try:
await asyncio.sleep(300) # Serve for 5 minutes.
finally:
transport.close()
asyncio.run(main())
Time to go home and eat supper