High Speed Topic MQTT Subscription

I'm looking for anyone who has experience consuming high speed messages from MQ using Cirrus link. If this thread is better posted on their forum, please let me know.

As a way to explain the issue we have, when we tried to use the IA provided TCP module to subscribe to high speed TCP messages, we discovered that because the driver only returns one message at a time, it is for lack of a better description, limited by scan rate. This means if there are multiple messages that arrive to the port at near the same time, we end up missing many messages.

To get around this, we developed a buffered TCP module that buffers all messages and completely empties the buffer returning a comma separated list of messages as a string.

We are seeing what we think is similar behavior now with MQ topics that also have similar speed considerations i.e. multiple messages are produced at nearly the same time and nothing within scan rate capability can capture all. For example the tag below only returns one message every scan but we know from a reference system that many more messages exist:

Is there any way around this that might already exist?

Thanks,

Nick

1 Like

How are you measuring the "messages per scan"? My understanding is the MQTT module and the tags it provides are all event-based. You may not see every change in the Tag Browser UI, but a tag change event (and history/alarm eval) should happen for every value change.

pinging @wes0johnson from CL

We are sinking the messages into a MSSQL DB onsite and when compared to a separate system, we have far fewer records. This is originally how we discovered the limitation of the stock TCP driver by comparing to a system that is not ours.

Nick

via tag history or a script?

script

Do you check the missedEvents flag and log if it's ever set?

Not yet, but I will add a logger now to check.

What I would like confirmation on is whether or not the Cirrus link module provided tags are actually event driven.

Will report back on the missedEvents.

Nick

The Cirrus Link modules are event driven. On the MQTT Engine side, every incoming Sparkplug metric results in a tag change event and insert into the Ignition historian if history is properly configured. There are some caveats to history config though. This is from our docs:

History must be enabled on the MQTT Engine Tag and the history settings for that tag must be as follows:
History Enabled: true
Storage Provider: Must be set to an existing Storage Provider
Sample Mode: On Change
Min Time Between Samples: 0 ms
This is required in cases where historical metrics flushed from the Edge have very high resolution; e.g., the historical metrics have timestamps 1 ms apart for a single tag.

The full doc is here. But it is mostly focused on Transmission side/Sparkplug store and forward: MQTT History - MQTT Modules for Ignition 8.x - Confluence

@wes0johnson thanks a lot for chiming in.

We are not ( and will not ) use the IA tag history model for this. One reason being that storing Stringifyed JSON into IA tag history makes it very difficult to pull anything useful out.

If we are using an event script to store the value on a tag change event, can event driven still be achieved?

The table we are populating now is shown below. When we run metrics against this table and compare to another data source, we are drastically lower which is why we think we are missing messages.

Noting: we are subscribing to a topic at the site, not publishing

Thanks,

Nick

Did you check the missedEvents flag yet?

If you really have a high rate of value changes per tag then it's unlikely your insert queries are running fast enough to keep the tag change event queue from overflowing.

1 Like

Not just that. Lots of insert queries in tag events are likely to tie up the thread pool completely.

1 Like

The sorter is stopped for the night. We have the logger on tag change to check for missed events and will know tomorrow.

Nick

Result is there are lots of missed events. We will be adjusting on our side and see if that improves the situation.

See this thread for a discussion on the differences between tag event scripts and project tag change scripts.

You may benefit from using tag change scripts instead, because of the unbounded queue size, but only if your workload is "bursty" and not sustained enough that you'd have memory problems.

We are actually working from tag change already. Unfortunately the tag change script was written such that it triggers a storage event for every single message --> this is likely a big part of the problem.

We are going to move to a method that caches the messages to a dataset and then inserts them in batch asynchronously in order to decouple the storage timing from the inter-message timing.

Nick

Unless you're talking about something else here, you are working from a tag event script (the ones defined on a tag). Tag change scripts (the ones defined as part of a project's event scripts) do not have a missedEvents flag.

This is definitely the right approach. Consider using a java concurrent queue type instead of a dataset.

1 Like

Thanks Phil, always appreciate your nuggets of experience.

1 Like

re: missedEvents flag in UDT tag event scripts

FWIW, I will share this experience / observation. In order to further reduce my valueChanged() script, I had the idea that perhaps the qualityChanged() event might trigger upon a missed event.

def qualityChanged(tag, tagPath, previousValue, currentValue, initialChange, missedEvents):

It doesn't.

Solution is built using non-blocking event logic and forked execution via system.tag.readAsync() callback functions. However because of the bounded (5) event queue for the UDT instance's valueChanged() event script, it is necessary to move execution to the Gateway Events tag-change execution context which, as @Kevin.Herron indicated, has an unbounded event queue.

By running parallel execution of the UDT and Gateway Event scripts (logging to two tables), we're able to verify that the Gateway Event execution captures upwards of 50 missed events per minute. At the same time, the UDT grossly undercounts missed events, indicative of the bursty nature of our event stream.


Here is the functioning UDT definition and code from project library:

libPath = gatewayEvents.MCBL.sorterDisposition
#########################################################################################
# Define adapter for gateway tag-change event to use UDT's valueChanged script
#########################################################################################
def valueChangedAdapter(event, newValue, executionCount, initialChange):
	tag = {
		'parameters': {
			'InstanceName':			'sorterDisposition',
			'PathToParentFolder':	'MCBL/sorterDisposition',
			'shiftNumTagPath':		'[OEE]OEE/shifts/AllShifts/info/shiftNumber',
			'dimBuffer':			30,
			'tblName':				"mcblsort",
			'sqlEnabled':			1,
			'jdbc':					"Ignition_MCBL"
		}
	}
	tagPath 		= '[IBM_MQTT]MCBL/sorterDisposition/message'
	previousValue	= event.getPreviousValue()
	currentValue	= event.getCurrentValue()
	missedEvents	= 0
	
	libPath.valueChanged(
		tag, tagPath, previousValue, currentValue, initialChange, missedEvents)
	return
# end valueChangedAdapter() #############################################################
#########################################################################################
# Verbatim copy of the valueChanged() tag event script from from the UDT
#########################################################################################
def valueChanged(tag, tagPath, previousValue, currentValue, initialChange, missedEvents):
	if initialChange:	return	# do nothing

	from java.util	import Date 	# Date().getTime()
	from java.lang	import System	# System.currentTimeMillis()
	t0	= int(System.currentTimeMillis())				# java.lang.System
	dt	= lambda t: int(System.currentTimeMillis() - t)	# returns milliseconds
	udt	= tag['parameters']	# dictionary to access instance-values of UDT parameters
	log	= system.util.getLogger(str('bufferedMQ.%s' % udt['InstanceName']))
	tagProvider = tagPath[1:][:tagPath.find("]")-1]
	fullyQualified = lambda t: '[%s]%s/%s' % (tagProvider, udt['PathToParentFolder'], t)
	#####################################################################################
	if missedEvents:
		tagPaths1 = [fullyQualified('missedEvents')]	# "[.]missedEvents"
		# Define asynchronous callback function for system.tag.readAsync()
		def callback1(asyncReturn1):	#================================================
			qVs = asyncReturn1
			nMissed = 1 + int(qVs[0].value)
			system.tag.writeBlocking(tagPaths1,[nMissed])
			tpl = 'valueChanged: callback1: %d missed message(s) [%dms]'
			log.warnf(tpl, nMissed, dt(t0) )
			return	# end callback1() ===================================================
		system.tag.readAsync(tagPaths1, callback1)
		log.debugf('valueChanged: system.tag.readAsync callback1 [%dms]', dt(t0))
	# end missedEvents ##################################################################
	tagPaths = [
		fullyQualified('jsonDoc'),	# "[.]jsonDoc",
		udt['shiftNumTagPath']
	]
	#####################################################################################
	# Define second asynchronous callback function for second system.tag.readAsync()
	#####################################################################################
	def callback2(asyncReturn):
		qVs = asyncReturn

		# Convert immutable PyDocumentObjectAdapter objects to mutable PyDict dictionary
		# https://files.inductiveautomation.com/sdk/javadoc/ignition81/8.1.1/com/inductiveautomation/ignition/common/script/adapters/PyDocumentObjectAdapter.html#toDict()
		buf = qVs[0].value.toDict()
		try:	msg = currentValue.value.toDict()
		except:	msg = system.util.jsonDecode(currentValue.value)

		# Replace None/null-valued values with empty string
		msgKeys = msg.keys()
		for k in [k for k in msgKeys if str(msg[k]) in ['null','None']]:
			msg[k] = ''

		# Enrich the message (a mutable dictionary) with four additional attributes		
		msg['epochms'] = currentValue.timestamp.getTime()
		DoW	= lambda epochms: 1+int(epochms//86400000+4)%7	# +4 -> Sun:=1, +3 -> Mon:=1
		msg['partition'] = DoW( msg['epochms'] )
		try:	msg['shiftNum'] = int(qVs[-1].value)
		except:	msg['shiftNum'] = 255
		msg['sorter'] = 0

		# Append the enriched message (mutated dict) to the buffer element (a list) 
		try:	buf['array'].append(msg)
		except:	buf['array'] = [msg]

		count = len(buf['array'])	# number of messages currently in the buffer
		if count < udt['dimBuffer']:
			# Convert mutated dictionary to string representation of the JSON object
			jsonStr = system.util.jsonEncode(buf,0)

			# Store the updated buffer in the UDT's Document memory tag
			LoQ = system.tag.writeBlocking([tagPaths[0]],[jsonStr])
			tpl = 'valueChanged: callback2: msg{%d} added to buffer[%d] [%dms]'
			log.debugf(tpl, len(msgKeys)+4, count, dt(t0))
		else:
			#############################################################################
			# Define function to asynchronously INSERT buffered data into database
			#############################################################################
			def insertBuffer(LoD=buf['array'], udt=udt):
				t0 = int(System.currentTimeMillis())

				# Construct SQL insert statement
				tplInsert = 'INSERT INTO %s %%s VALUES %%s' % udt['tblName']
				tplValue = "(%d,%d,%d,'%s','%s',%d,'%s','%s','%s','%s','%s','%s',%d)"
				keys =[
					'sorter',		# int
					'epochms',		# int
					'shiftNum',		# int
					'scannedLabel',	# str
					'lpn',			# str
					'inductNumber',	# int
					'doorNbr',		# str
					'labelStatus',	# str
					'destination',	# str
					'labelType',	# str
					'errorCode',	# str
					'cameraId',		# str
					'partition'		# int
				]
				strSchema = '(%s)' % ','.join([field for field in keys])	# unquoted field names
				value = lambda d,k: d[k] if k in d.keys() else ''			# catch missing keys
				LoL = [[value(dict,key) for key in keys] for dict in LoD]	# LoL := list of lists
				strValues = ','.join([tplValue%tuple(lst) for lst in LoL])	# CSV str of values
				sqlStatement = tplInsert % (strSchema, strValues)
				log.tracef('insertBuffer: SQL: %s', sqlStatement)

				# Execute SQL insert statement
				if	bool(udt['sqlEnabled']):	# if SQL is enabled for this instance
					ds = system.db.getConnectionInfo(udt['jdbc'])
					if ds.getValueAt(0,'Status') != 'Valid':
						tpl = 'insertBuffer: database connection %s not available [%dms]'
						log.errorf(tpl, udt['jdbc'], dt(t0))
					else:
						rows = system.db.runUpdateQuery(
							sqlStatement, udt['jdbc'], tx='', getKey=0, skipAudit=1)
						duration = dt(t0)
						tpl = 'insertBuffer: %d rows inserted into %s [%dms]'
						if duration>250:	# milliseconds
							log.warnf(tpl, rows,udt['tblName'],duration)
						else:
							log.debugf(tpl, rows,udt['tblName'],duration)
				else:
					tpl = 'insertBuffer: SQL logging to %s disabled [%dms]'
					log.warnf(tpl, udt['tblName'],dt(t0))
				return
			# end insertBuffer() ########################################################
			system.util.invokeAsynchronous( insertBuffer )	# spawn async thread for SQL

			# Reset the buffer with an empty list
			jsonStr = system.util.jsonEncode({"array":[]},0)
			LoQ = system.tag.writeBlocking([tagPaths[0]],[jsonStr])
			log.debugf('valueChanged: callback2: buffer reset [%dms]', dt(t0))
		return
	# end callback2() ###################################################################
	system.tag.readAsync(tagPaths, callback2)
	log.tracef('valueChanged: system.tag.readAsync callback2 [%dms]', dt(t0))
	return
# end valueChanged() ####################################################################

It is worth noting the utility of the Document-type memory tag to serve as the event buffer. The JSON object enables aggregation of multiple properties with a single handle, helping avoid synchronization issues in the event logic.

Thanks to Cirrus-Link's Dan Potter & Nathan Davenport and IA's @jrosenkrans for working with us on this issue.