High Speed Topic MQTT Subscription

Solution is built using non-blocking event logic and forked execution via system.tag.readAsync() callback functions. However because of the bounded (5) event queue for the UDT instance's valueChanged() event script, it is necessary to move execution to the Gateway Events tag-change execution context which, as @Kevin.Herron indicated, has an unbounded event queue.

By running parallel execution of the UDT and Gateway Event scripts (logging to two tables), we're able to verify that the Gateway Event execution captures upwards of 50 missed events per minute. At the same time, the UDT grossly undercounts missed events, indicative of the bursty nature of our event stream.


Here is the functioning UDT definition and code from project library:

libPath = gatewayEvents.MCBL.sorterDisposition
#########################################################################################
# Define adapter for gateway tag-change event to use UDT's valueChanged script
#########################################################################################
def valueChangedAdapter(event, newValue, executionCount, initialChange):
	tag = {
		'parameters': {
			'InstanceName':			'sorterDisposition',
			'PathToParentFolder':	'MCBL/sorterDisposition',
			'shiftNumTagPath':		'[OEE]OEE/shifts/AllShifts/info/shiftNumber',
			'dimBuffer':			30,
			'tblName':				"mcblsort",
			'sqlEnabled':			1,
			'jdbc':					"Ignition_MCBL"
		}
	}
	tagPath 		= '[IBM_MQTT]MCBL/sorterDisposition/message'
	previousValue	= event.getPreviousValue()
	currentValue	= event.getCurrentValue()
	missedEvents	= 0
	
	libPath.valueChanged(
		tag, tagPath, previousValue, currentValue, initialChange, missedEvents)
	return
# end valueChangedAdapter() #############################################################
#########################################################################################
# Verbatim copy of the valueChanged() tag event script from from the UDT
#########################################################################################
def valueChanged(tag, tagPath, previousValue, currentValue, initialChange, missedEvents):
	if initialChange:	return	# do nothing

	from java.util	import Date 	# Date().getTime()
	from java.lang	import System	# System.currentTimeMillis()
	t0	= int(System.currentTimeMillis())				# java.lang.System
	dt	= lambda t: int(System.currentTimeMillis() - t)	# returns milliseconds
	udt	= tag['parameters']	# dictionary to access instance-values of UDT parameters
	log	= system.util.getLogger(str('bufferedMQ.%s' % udt['InstanceName']))
	tagProvider = tagPath[1:][:tagPath.find("]")-1]
	fullyQualified = lambda t: '[%s]%s/%s' % (tagProvider, udt['PathToParentFolder'], t)
	#####################################################################################
	if missedEvents:
		tagPaths1 = [fullyQualified('missedEvents')]	# "[.]missedEvents"
		# Define asynchronous callback function for system.tag.readAsync()
		def callback1(asyncReturn1):	#================================================
			qVs = asyncReturn1
			nMissed = 1 + int(qVs[0].value)
			system.tag.writeBlocking(tagPaths1,[nMissed])
			tpl = 'valueChanged: callback1: %d missed message(s) [%dms]'
			log.warnf(tpl, nMissed, dt(t0) )
			return	# end callback1() ===================================================
		system.tag.readAsync(tagPaths1, callback1)
		log.debugf('valueChanged: system.tag.readAsync callback1 [%dms]', dt(t0))
	# end missedEvents ##################################################################
	tagPaths = [
		fullyQualified('jsonDoc'),	# "[.]jsonDoc",
		udt['shiftNumTagPath']
	]
	#####################################################################################
	# Define second asynchronous callback function for second system.tag.readAsync()
	#####################################################################################
	def callback2(asyncReturn):
		qVs = asyncReturn

		# Convert immutable PyDocumentObjectAdapter objects to mutable PyDict dictionary
		# https://files.inductiveautomation.com/sdk/javadoc/ignition81/8.1.1/com/inductiveautomation/ignition/common/script/adapters/PyDocumentObjectAdapter.html#toDict()
		buf = qVs[0].value.toDict()
		try:	msg = currentValue.value.toDict()
		except:	msg = system.util.jsonDecode(currentValue.value)

		# Replace None/null-valued values with empty string
		msgKeys = msg.keys()
		for k in [k for k in msgKeys if str(msg[k]) in ['null','None']]:
			msg[k] = ''

		# Enrich the message (a mutable dictionary) with four additional attributes		
		msg['epochms'] = currentValue.timestamp.getTime()
		DoW	= lambda epochms: 1+int(epochms//86400000+4)%7	# +4 -> Sun:=1, +3 -> Mon:=1
		msg['partition'] = DoW( msg['epochms'] )
		try:	msg['shiftNum'] = int(qVs[-1].value)
		except:	msg['shiftNum'] = 255
		msg['sorter'] = 0

		# Append the enriched message (mutated dict) to the buffer element (a list) 
		try:	buf['array'].append(msg)
		except:	buf['array'] = [msg]

		count = len(buf['array'])	# number of messages currently in the buffer
		if count < udt['dimBuffer']:
			# Convert mutated dictionary to string representation of the JSON object
			jsonStr = system.util.jsonEncode(buf,0)

			# Store the updated buffer in the UDT's Document memory tag
			LoQ = system.tag.writeBlocking([tagPaths[0]],[jsonStr])
			tpl = 'valueChanged: callback2: msg{%d} added to buffer[%d] [%dms]'
			log.debugf(tpl, len(msgKeys)+4, count, dt(t0))
		else:
			#############################################################################
			# Define function to asynchronously INSERT buffered data into database
			#############################################################################
			def insertBuffer(LoD=buf['array'], udt=udt):
				t0 = int(System.currentTimeMillis())

				# Construct SQL insert statement
				tplInsert = 'INSERT INTO %s %%s VALUES %%s' % udt['tblName']
				tplValue = "(%d,%d,%d,'%s','%s',%d,'%s','%s','%s','%s','%s','%s',%d)"
				keys =[
					'sorter',		# int
					'epochms',		# int
					'shiftNum',		# int
					'scannedLabel',	# str
					'lpn',			# str
					'inductNumber',	# int
					'doorNbr',		# str
					'labelStatus',	# str
					'destination',	# str
					'labelType',	# str
					'errorCode',	# str
					'cameraId',		# str
					'partition'		# int
				]
				strSchema = '(%s)' % ','.join([field for field in keys])	# unquoted field names
				value = lambda d,k: d[k] if k in d.keys() else ''			# catch missing keys
				LoL = [[value(dict,key) for key in keys] for dict in LoD]	# LoL := list of lists
				strValues = ','.join([tplValue%tuple(lst) for lst in LoL])	# CSV str of values
				sqlStatement = tplInsert % (strSchema, strValues)
				log.tracef('insertBuffer: SQL: %s', sqlStatement)

				# Execute SQL insert statement
				if	bool(udt['sqlEnabled']):	# if SQL is enabled for this instance
					ds = system.db.getConnectionInfo(udt['jdbc'])
					if ds.getValueAt(0,'Status') != 'Valid':
						tpl = 'insertBuffer: database connection %s not available [%dms]'
						log.errorf(tpl, udt['jdbc'], dt(t0))
					else:
						rows = system.db.runUpdateQuery(
							sqlStatement, udt['jdbc'], tx='', getKey=0, skipAudit=1)
						duration = dt(t0)
						tpl = 'insertBuffer: %d rows inserted into %s [%dms]'
						if duration>250:	# milliseconds
							log.warnf(tpl, rows,udt['tblName'],duration)
						else:
							log.debugf(tpl, rows,udt['tblName'],duration)
				else:
					tpl = 'insertBuffer: SQL logging to %s disabled [%dms]'
					log.warnf(tpl, udt['tblName'],dt(t0))
				return
			# end insertBuffer() ########################################################
			system.util.invokeAsynchronous( insertBuffer )	# spawn async thread for SQL

			# Reset the buffer with an empty list
			jsonStr = system.util.jsonEncode({"array":[]},0)
			LoQ = system.tag.writeBlocking([tagPaths[0]],[jsonStr])
			log.debugf('valueChanged: callback2: buffer reset [%dms]', dt(t0))
		return
	# end callback2() ###################################################################
	system.tag.readAsync(tagPaths, callback2)
	log.tracef('valueChanged: system.tag.readAsync callback2 [%dms]', dt(t0))
	return
# end valueChanged() ####################################################################

It is worth noting the utility of the Document-type memory tag to serve as the event buffer. The JSON object enables aggregation of multiple properties with a single handle, helping avoid synchronization issues in the event logic.

Thanks to Cirrus-Link's Dan Potter & Nathan Davenport and IA's @jrosenkrans for working with us on this issue.