Python code check

More progress on my DMLogger replacement :slight_smile:

I have the following python code working to listen for UDP messages and stuff them into a database table.

import socket
import datetime
import psycopg2

UDP_IP = "0.0.0.0"
UDP_PORT = 29298

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))

while True:
	data, addr = sock.recvfrom(1024)
	address = addr[0]
	timestamp = datetime.datetime.now()
	message = data.decode("utf-8")
	print(address)
	print(timestamp)
	print(message)
	dbSession = psycopg2.connect("host='REDACTED' dbname='REDACTED' user='REDACTED' password='EXTRA REDACTED'");
	cursor = dbSession.cursor()
	cursor.execute("INSERT INTO dmlogger (address, timestamp, message) VALUES(%s, %s, %s)", (address, timestamp, message))
	dbSession.commit()
	dbSession.close()

I’m currently just running this from the console, I’ll take out the print statements and work on turning it into a service later. I was hoping to get a sanity check that the code structure is going to be performant. I have little experience with python, but I’m wondering if a lag in database performance would cause me to miss UDP messages. Should I be writing the DB connection stuff as a function and call it from the while loop? I’m guessing there’s something more than that that needs to happen to spawn a different thread to handle the DB stuff and let the UDP listener catch the next packet.

Basically, if anyone has some pointers to push me in the right direction, I’d appreciate it :slight_smile:

Probably want to take advantage of python3’s async/await capabilities. And probably don’t want to be opening and closing a database connection for every packet.

I’ll check out the async/await functions.

Yeah, I wasn’t sure that was the best approach for DB connections. My concern was the connection timing out. There will probably be times when it will be a day or two between packets (weekends with no one actively operating equipment). I figured the DB connection would probably time out, so opening and closing was an obvious alternative. I knew it probably wasn’t the best way to go, but it got the code running :slight_smile:

If you have thoughts on how to handle that, I’d love to hear them!

I have updated code for everyone to amuse themselves with :slight_smile:

I switched to asyncpg for the postgresql connection library. Looked like it was easier to use with async/await. The code still has plenty of print statements and is probably my worst copypasta from various online sources in my recent history. I’m out of my depth here :sweat_smile: But, it works! I’m getting data into my database. I’m sure there’s lots of improvements that could be made. I’m doing very little for exception handling, and I don’t have a good idea of what happens if the db closes the connection (haven’t tried experimenting with that yet).

import datetime
import asyncio
import asyncpg

class UDPServerProtocol:
	def __init__(self, entryQueue):
		self.entryQueue = entryQueue
	def connection_made(self, transport):
		self.transport = transport
	def datagram_received(self, data, addr):
		message = data.decode("utf-8")
		address = addr[0]
		timestamp = datetime.datetime.now()
		#print(address)
		#print(timestamp)
		#print(message)
		task = asyncio.create_task(producer(self.entryQueue, address, timestamp, message))
	def connection_lost(self, transport):
		print("Connection lost!")

async def get_asyncpg_connection_pool():
	db_conf = {
		"user": "REDACTED",
		"password": "EXTRA REDACTED",
		"database": "REDACTED",
		"host": "REDACTED",
		"min_size":2,
		"max_size":4
	}
	connection = await asyncpg.create_pool(**db_conf)
	return connection

async def producer(entryQueue,address,timestamp,message):
	print(address)
	print(timestamp)
	print(message)
	entry = {"address":address, "timestamp":timestamp, "message":message}
	await entryQueue.put(entry)

async def qMonitor(entryQueue):
	while True:
		print("Queue size is ", entryQueue.qsize())
		await asyncio.sleep(15)

async def consumer(entryQueue, pool):
	print("Starting consumer")
	# Take a connection from the pool.
	print("getting db connection")
	async with pool.acquire() as connection:
		while True:
			#get an entry from the queue, wait until one is available
			print("waiting for queue entry")
			entry = await entryQueue.get()
			print("Processing entry!")
			#process entry.  Using a try block so we can return the entry to the queue if the processing fails
			try:
				#just printing for now.  Add code to push to database here
				address = entry.get("address")
				timestamp = entry.get("timestamp")
				message = entry.get("message")
				print(timestamp, address, message)
				dbquery = "INSERT INTO dmlogger (address, timestamp, message) VALUES($1, $2, $3);"
				await connection.execute(dbquery, address, timestamp, message)
			except RuntimeError as e:
				#if there was some exception, put the entry back into the queue
				#may need to check the type of exception, if it is possible an exception can be raised while still getting a database entry
				print(f"consumer exception {e}")
				await entryQueue.put(entry)
			except:
				print("unhandled exception")
				await entryQueue.put(entry)
			finally:
				entryQueue.task_done()

async def main():
	print("Starting UDP server")
	# Get a reference to the event loop as we plan to use
	# low-level APIs.
	loop = asyncio.get_running_loop()
	#start a queue
	entryQueue = asyncio.Queue(maxsize=1000)
	#watch the current queue
	#watchQ = asyncio.create_task(qMonitor(entryQueue))
	watchers = [asyncio.create_task(qMonitor(entryQueue)) for i in range(1)]
	print("opening db connection pool")
	pool = await get_asyncpg_connection_pool()
	#starting one consumer for now.  Later, might expand to 2-3 consumers
	consumers = [asyncio.create_task(consumer(entryQueue, pool)) for i in range(2)]
	# One protocol instance will be created to serve all
	# client requests.
	transport, protocol = await loop.create_datagram_endpoint(lambda: UDPServerProtocol(entryQueue),local_addr=('0.0.0.0', 29298))
	try:
		await asyncio.sleep(300)  # Serve for 5 minutes.
	finally:
		transport.close()

asyncio.run(main())

Time to go home and eat supper :slight_smile:

Well, I made a couple of modifications to the code posted above. I moved the line that grabs a pool connection to the while loop in the consumer function, just after the await queue.get() line. This will grab a new connection from the pool when it’s ready to use it.

I also replaced the asyncio.sleep line in main() with an asyncio.gather command that will basically wait forever and keep the service running. I was using the sleep to do the testing with.

I figured the code might be helpful to anyone running ignition and wanting to catch messages from a DoMore PLC. I’ll probably run with this version until it causes problems or until someone points out a glaring error. Can’t wait to see what comes first :sweat_smile:

import datetime
import asyncio
import asyncpg

class UDPServerProtocol:
	def __init__(self, entryQueue):
		self.entryQueue = entryQueue
	def connection_made(self, transport):
		self.transport = transport
	def datagram_received(self, data, addr):
		message = data.decode("utf-8")
		address = addr[0]
		timestamp = datetime.datetime.now()
		task = asyncio.create_task(producer(self.entryQueue, address, timestamp, message))
	def connection_lost(self, transport):
		print("Connection lost!")

async def get_asyncpg_connection_pool():
	db_conf = {
		"user": "REDACTED",
		"password": "EXTRA REDACTED",
		"database": "REDACTED",
		"host": "REDACTED",
		"min_size":2,
		"max_size":4
	}
	connection = await asyncpg.create_pool(**db_conf)
	return connection

async def producer(entryQueue,address,timestamp,message):
	#print(address)
	#print(timestamp)
	#print(message)
	entry = {"address":address, "timestamp":timestamp, "message":message}
	await entryQueue.put(entry)

async def qMonitor(entryQueue):
	while True:
		print("Queue size is ", entryQueue.qsize())
		await asyncio.sleep(15)

async def consumer(entryQueue, pool):
	#print("Starting consumer")
	while True:
		#get an entry from the queue, wait until one is available
		#print("waiting for queue entry")
		entry = await entryQueue.get()
		#print("Processing entry!")
		#process entry.  Using a try block so we can return the entry to the queue if the processing fails
		try:
			# Take a connection from the pool.
			#print("getting db connection")
			async with pool.acquire() as connection:
				#just printing for now.  Add code to push to database here
				address = entry.get("address")
				timestamp = entry.get("timestamp")
				message = entry.get("message")
				#print(timestamp, address, message)
				dbquery = "INSERT INTO dmlogger (address, timestamp, message) VALUES($1, $2, $3);"
				await connection.execute(dbquery, address, timestamp, message)
		except RuntimeError as e:
			#if there was some exception, put the entry back into the queue
			#may need to check the type of exception, if it is possible an exception can be raised while still getting a database entry
			print(f"consumer exception {e}")
			await entryQueue.put(entry)
			#sleep for a few seconds to avoid immediate retries
			await asyncio.sleep(5)
		except:
			print("unhandled exception")
			await entryQueue.put(entry)
			#sleep for a few seconds to avoid immediate retries
			await asyncio.sleep(5)
		finally:
			entryQueue.task_done()

async def main():
	#print("Starting UDP server")
	# Get a reference to the event loop as we plan to use
	# low-level APIs.
	loop = asyncio.get_running_loop()
	#start a queue
	entryQueue = asyncio.Queue(maxsize=1000)
	#watch the current queue
	#watchers = [asyncio.create_task(qMonitor(entryQueue)) for i in range(1)]
	#print("opening db connection pool")
	pool = await get_asyncpg_connection_pool()
	#starting one consumer for now.  Later, might expand to 2-3 consumers
	consumers = [asyncio.create_task(consumer(entryQueue, pool)) for i in range(2)]
	# One protocol instance will be created to serve all
	# client requests.
	transport, protocol = await loop.create_datagram_endpoint(lambda: UDPServerProtocol(entryQueue),local_addr=('0.0.0.0', 29298))
	try:
		#await asyncio.sleep(300)  # Serve for 5 minutes.
		#wait for consumers to exit, which effectively holds the program open indefinitely as the consumers never exit
		await asyncio.gather(*consumers)
	finally:
		transport.close()

asyncio.run(main())
1 Like

Looks pretty good!

1 Like

Ha, for a minute there I thought you had this running under Jython (2.5, no less! if the 7.9 tag is accurate) and it broke my brain because I just couldn’t believe it.

1 Like

Sorry Kevin! Lol, I should have been a little more clear about where I was running this. It’s in a python 3.8 environment with the appropriate modules installed. Had to put on my big boy pants!

So, really, Ignition doesn’t have anything to do with this bit of code, other than that’s what I’ll be using for a user interface to view and filter the records. I guess it’s useful to anyone running python and postgresql :slight_smile:

And, sadly, yes the 7.9 tag is accurate. I need to get my system updated. I keep putting it off, but it’s something that needs to happen. I finally got rid of the last showstopper hold up. I had some old controllers here that had a custom ASCII serial protocol that I had to have online with the SCADA. In 7.9, I had a bunch of tag scripts to do the requesting and parsing of data, writing back out, etc. Those scripts were longer than what was allowed in a 8.0 tag script. So, instead of rewriting my code, I replaced the controllers :slight_smile: They were end-of-life anyway and needed to go. The new stuff talks Modbus :smiley:

1 Like

Made a couple more modifications. We had power problems at the plant today, and it revealed a corner case (more with my db than the code here). The column holding the message was setup as text in postgresql, and apparently it doesn’t like holding null characters (not NULL values, just 0x00 characters). So I added some code to strip out the null characters. Haven’t powered off the plant to see if it works or not, so YYMV. I also put in some logging code to troubleshoot the string formatting code I was adding :sweat_smile:

import datetime
import asyncio
import asyncpg
import syslog

class UDPServerProtocol:
        def __init__(self, entryQueue):
                self.entryQueue = entryQueue
        def connection_made(self, transport):
                self.transport = transport
        def datagram_received(self, data, addr):
                message = data.decode("utf-8")
                address = addr[0]
                timestamp = datetime.datetime.now()
                task = asyncio.create_task(producer(self.entryQueue, address, timestamp, message))
        def connection_lost(self, transport):
                syslog.syslog("Connection lost!")

async def get_asyncpg_connection_pool():
        db_conf = {
                "user": "REDACTED",
                "password": "EXTRA REDACTED",
                "database": "REDACTED",
                "host": "REDACTED",
                "min_size":2,
                "max_size":4
        }
        connection = await asyncpg.create_pool(**db_conf)
        return connection

async def producer(entryQueue,address,timestamp,message):
        entry = {"address":address, "timestamp":timestamp, "message":message}
        await entryQueue.put(entry)

async def qMonitor(entryQueue):
        while True:
                print("Queue size is ", entryQueue.qsize())
                await asyncio.sleep(15)

async def consumer(entryQueue, pool):
        #print("Starting consumer")
        while True:
                #get an entry from the queue, wait until one is available
                # print("waiting for queue entry")
                entry = await entryQueue.get()
                #print("Processing entry!")
                # process entry.  Using a try block so we can return the entry to the queue if the processing fails
                try:
                        # Take a connection from the pool.
                        # print("getting db connection")
                        async with pool.acquire() as connection:
                                #just printing for now.  Add code to push to database here
                                address = entry.get("address")
                                timestamp = entry.get("timestamp")
                                message = entry.get("message")
                                message = message.replace("\x00", "")
                                #print(timestamp, address, message)
                                dbquery = "INSERT INTO dmlogger (address, timestamp, message) VALUES($1, $2, $3);"
                                await connection.execute(dbquery, address, timestamp, message)
                except RuntimeError as e:
                        #if there was some exception, put the entry back into the queue
                        #may need to check the type of exception, if it is possible an exception can be
                        #raised while still getting a database entry
                        syslog.syslog(f"consumer exception {e}")
                        address = entry.get("address")
                        timestamp = entry.get("timestamp")
                        message = entry.get("message")
                        syslog.syslog(f"putting entry back in queue {address}{timestamp}{message}")
                        await entryQueue.put(entry)
                        #sleep for a few seconds to avoid immediate retries
                        await asyncio.sleep(5)
                except Exception as e:
                        syslog.syslog(f"unhandled exception {e}")
                        address = entry.get("address")
                        timestamp = entry.get("timestamp")
                        message = entry.get("message")
                        syslog.syslog(f"putting entry back in queue {address} {timestamp} {message}")
                        await entryQueue.put(entry)
                        #sleep for a few seconds to avoid immediate retries
                        await asyncio.sleep(5)
                finally:
                        entryQueue.task_done()

async def main():
        syslog.syslog("Starting UDP server")
        # Get a reference to the event loop as we plan to use low-level APIs.
        loop = asyncio.get_running_loop()
        #start a queue
        entryQueue = asyncio.Queue(maxsize=1000)
        #watch the current queue
        #watchers = [asyncio.create_task(qMonitor(entryQueue)) for i in range(1)]
        #print("opening db connection pool")
        pool = await get_asyncpg_connection_pool()
        #starting two consumers for now.  Later, might expand
        consumers = [asyncio.create_task(consumer(entryQueue, pool)) for i in range(2)]
        # One protocol instance will be created to serve all client requests.
        transport, protocol = await loop.create_datagram_endpoint(lambda: UDPServerProtocol(entryQueue),local_addr=('0.0.0.0', 29298))
        try:
                #await asyncio.sleep(300) # Serve for 5 minutes.
                # wait for consumers to exit,
                #which effectively holds the program open indefinitely as the consumers never
                #exit
                await asyncio.gather(*consumers)
        finally:
                transport.close()
asyncio.run(main())