Well, I made a couple of modifications to the code posted above. I moved the line that grabs a pool connection to the while loop in the consumer function, just after the await queue.get() line. This will grab a new connection from the pool when it’s ready to use it.
I also replaced the asyncio.sleep line in main() with an asyncio.gather command that will basically wait forever and keep the service running. I was using the sleep to do the testing with.
I figured the code might be helpful to anyone running ignition and wanting to catch messages from a DoMore PLC. I’ll probably run with this version until it causes problems or until someone points out a glaring error. Can’t wait to see what comes first
import datetime
import asyncio
import asyncpg
class UDPServerProtocol:
def __init__(self, entryQueue):
self.entryQueue = entryQueue
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode("utf-8")
address = addr[0]
timestamp = datetime.datetime.now()
task = asyncio.create_task(producer(self.entryQueue, address, timestamp, message))
def connection_lost(self, transport):
print("Connection lost!")
async def get_asyncpg_connection_pool():
db_conf = {
"user": "REDACTED",
"password": "EXTRA REDACTED",
"database": "REDACTED",
"host": "REDACTED",
"min_size":2,
"max_size":4
}
connection = await asyncpg.create_pool(**db_conf)
return connection
async def producer(entryQueue,address,timestamp,message):
#print(address)
#print(timestamp)
#print(message)
entry = {"address":address, "timestamp":timestamp, "message":message}
await entryQueue.put(entry)
async def qMonitor(entryQueue):
while True:
print("Queue size is ", entryQueue.qsize())
await asyncio.sleep(15)
async def consumer(entryQueue, pool):
#print("Starting consumer")
while True:
#get an entry from the queue, wait until one is available
#print("waiting for queue entry")
entry = await entryQueue.get()
#print("Processing entry!")
#process entry. Using a try block so we can return the entry to the queue if the processing fails
try:
# Take a connection from the pool.
#print("getting db connection")
async with pool.acquire() as connection:
#just printing for now. Add code to push to database here
address = entry.get("address")
timestamp = entry.get("timestamp")
message = entry.get("message")
#print(timestamp, address, message)
dbquery = "INSERT INTO dmlogger (address, timestamp, message) VALUES($1, $2, $3);"
await connection.execute(dbquery, address, timestamp, message)
except RuntimeError as e:
#if there was some exception, put the entry back into the queue
#may need to check the type of exception, if it is possible an exception can be raised while still getting a database entry
print(f"consumer exception {e}")
await entryQueue.put(entry)
#sleep for a few seconds to avoid immediate retries
await asyncio.sleep(5)
except:
print("unhandled exception")
await entryQueue.put(entry)
#sleep for a few seconds to avoid immediate retries
await asyncio.sleep(5)
finally:
entryQueue.task_done()
async def main():
#print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
#start a queue
entryQueue = asyncio.Queue(maxsize=1000)
#watch the current queue
#watchers = [asyncio.create_task(qMonitor(entryQueue)) for i in range(1)]
#print("opening db connection pool")
pool = await get_asyncpg_connection_pool()
#starting one consumer for now. Later, might expand to 2-3 consumers
consumers = [asyncio.create_task(consumer(entryQueue, pool)) for i in range(2)]
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(lambda: UDPServerProtocol(entryQueue),local_addr=('0.0.0.0', 29298))
try:
#await asyncio.sleep(300) # Serve for 5 minutes.
#wait for consumers to exit, which effectively holds the program open indefinitely as the consumers never exit
await asyncio.gather(*consumers)
finally:
transport.close()
asyncio.run(main())