Solution is built using non-blocking event logic and forked execution via system.tag.readAsync()
callback functions. However because of the bounded (5) event queue for the UDT instance's valueChanged() event script, it is necessary to move execution to the Gateway Events tag-change execution context which, as @Kevin.Herron indicated, has an unbounded event queue.
By running parallel execution of the UDT and Gateway Event scripts (logging to two tables), we're able to verify that the Gateway Event execution captures upwards of 50 missed events per minute. At the same time, the UDT grossly undercounts missed events, indicative of the bursty nature of our event stream.
Here is the functioning UDT definition and code from project library:
libPath = gatewayEvents.MCBL.sorterDisposition
#########################################################################################
# Define adapter for gateway tag-change event to use UDT's valueChanged script
#########################################################################################
def valueChangedAdapter(event, newValue, executionCount, initialChange):
tag = {
'parameters': {
'InstanceName': 'sorterDisposition',
'PathToParentFolder': 'MCBL/sorterDisposition',
'shiftNumTagPath': '[OEE]OEE/shifts/AllShifts/info/shiftNumber',
'dimBuffer': 30,
'tblName': "mcblsort",
'sqlEnabled': 1,
'jdbc': "Ignition_MCBL"
}
}
tagPath = '[IBM_MQTT]MCBL/sorterDisposition/message'
previousValue = event.getPreviousValue()
currentValue = event.getCurrentValue()
missedEvents = 0
libPath.valueChanged(
tag, tagPath, previousValue, currentValue, initialChange, missedEvents)
return
# end valueChangedAdapter() #############################################################
#########################################################################################
# Verbatim copy of the valueChanged() tag event script from from the UDT
#########################################################################################
def valueChanged(tag, tagPath, previousValue, currentValue, initialChange, missedEvents):
if initialChange: return # do nothing
from java.util import Date # Date().getTime()
from java.lang import System # System.currentTimeMillis()
t0 = int(System.currentTimeMillis()) # java.lang.System
dt = lambda t: int(System.currentTimeMillis() - t) # returns milliseconds
udt = tag['parameters'] # dictionary to access instance-values of UDT parameters
log = system.util.getLogger(str('bufferedMQ.%s' % udt['InstanceName']))
tagProvider = tagPath[1:][:tagPath.find("]")-1]
fullyQualified = lambda t: '[%s]%s/%s' % (tagProvider, udt['PathToParentFolder'], t)
#####################################################################################
if missedEvents:
tagPaths1 = [fullyQualified('missedEvents')] # "[.]missedEvents"
# Define asynchronous callback function for system.tag.readAsync()
def callback1(asyncReturn1): #================================================
qVs = asyncReturn1
nMissed = 1 + int(qVs[0].value)
system.tag.writeBlocking(tagPaths1,[nMissed])
tpl = 'valueChanged: callback1: %d missed message(s) [%dms]'
log.warnf(tpl, nMissed, dt(t0) )
return # end callback1() ===================================================
system.tag.readAsync(tagPaths1, callback1)
log.debugf('valueChanged: system.tag.readAsync callback1 [%dms]', dt(t0))
# end missedEvents ##################################################################
tagPaths = [
fullyQualified('jsonDoc'), # "[.]jsonDoc",
udt['shiftNumTagPath']
]
#####################################################################################
# Define second asynchronous callback function for second system.tag.readAsync()
#####################################################################################
def callback2(asyncReturn):
qVs = asyncReturn
# Convert immutable PyDocumentObjectAdapter objects to mutable PyDict dictionary
# https://files.inductiveautomation.com/sdk/javadoc/ignition81/8.1.1/com/inductiveautomation/ignition/common/script/adapters/PyDocumentObjectAdapter.html#toDict()
buf = qVs[0].value.toDict()
try: msg = currentValue.value.toDict()
except: msg = system.util.jsonDecode(currentValue.value)
# Replace None/null-valued values with empty string
msgKeys = msg.keys()
for k in [k for k in msgKeys if str(msg[k]) in ['null','None']]:
msg[k] = ''
# Enrich the message (a mutable dictionary) with four additional attributes
msg['epochms'] = currentValue.timestamp.getTime()
DoW = lambda epochms: 1+int(epochms//86400000+4)%7 # +4 -> Sun:=1, +3 -> Mon:=1
msg['partition'] = DoW( msg['epochms'] )
try: msg['shiftNum'] = int(qVs[-1].value)
except: msg['shiftNum'] = 255
msg['sorter'] = 0
# Append the enriched message (mutated dict) to the buffer element (a list)
try: buf['array'].append(msg)
except: buf['array'] = [msg]
count = len(buf['array']) # number of messages currently in the buffer
if count < udt['dimBuffer']:
# Convert mutated dictionary to string representation of the JSON object
jsonStr = system.util.jsonEncode(buf,0)
# Store the updated buffer in the UDT's Document memory tag
LoQ = system.tag.writeBlocking([tagPaths[0]],[jsonStr])
tpl = 'valueChanged: callback2: msg{%d} added to buffer[%d] [%dms]'
log.debugf(tpl, len(msgKeys)+4, count, dt(t0))
else:
#############################################################################
# Define function to asynchronously INSERT buffered data into database
#############################################################################
def insertBuffer(LoD=buf['array'], udt=udt):
t0 = int(System.currentTimeMillis())
# Construct SQL insert statement
tplInsert = 'INSERT INTO %s %%s VALUES %%s' % udt['tblName']
tplValue = "(%d,%d,%d,'%s','%s',%d,'%s','%s','%s','%s','%s','%s',%d)"
keys =[
'sorter', # int
'epochms', # int
'shiftNum', # int
'scannedLabel', # str
'lpn', # str
'inductNumber', # int
'doorNbr', # str
'labelStatus', # str
'destination', # str
'labelType', # str
'errorCode', # str
'cameraId', # str
'partition' # int
]
strSchema = '(%s)' % ','.join([field for field in keys]) # unquoted field names
value = lambda d,k: d[k] if k in d.keys() else '' # catch missing keys
LoL = [[value(dict,key) for key in keys] for dict in LoD] # LoL := list of lists
strValues = ','.join([tplValue%tuple(lst) for lst in LoL]) # CSV str of values
sqlStatement = tplInsert % (strSchema, strValues)
log.tracef('insertBuffer: SQL: %s', sqlStatement)
# Execute SQL insert statement
if bool(udt['sqlEnabled']): # if SQL is enabled for this instance
ds = system.db.getConnectionInfo(udt['jdbc'])
if ds.getValueAt(0,'Status') != 'Valid':
tpl = 'insertBuffer: database connection %s not available [%dms]'
log.errorf(tpl, udt['jdbc'], dt(t0))
else:
rows = system.db.runUpdateQuery(
sqlStatement, udt['jdbc'], tx='', getKey=0, skipAudit=1)
duration = dt(t0)
tpl = 'insertBuffer: %d rows inserted into %s [%dms]'
if duration>250: # milliseconds
log.warnf(tpl, rows,udt['tblName'],duration)
else:
log.debugf(tpl, rows,udt['tblName'],duration)
else:
tpl = 'insertBuffer: SQL logging to %s disabled [%dms]'
log.warnf(tpl, udt['tblName'],dt(t0))
return
# end insertBuffer() ########################################################
system.util.invokeAsynchronous( insertBuffer ) # spawn async thread for SQL
# Reset the buffer with an empty list
jsonStr = system.util.jsonEncode({"array":[]},0)
LoQ = system.tag.writeBlocking([tagPaths[0]],[jsonStr])
log.debugf('valueChanged: callback2: buffer reset [%dms]', dt(t0))
return
# end callback2() ###################################################################
system.tag.readAsync(tagPaths, callback2)
log.tracef('valueChanged: system.tag.readAsync callback2 [%dms]', dt(t0))
return
# end valueChanged() ####################################################################
It is worth noting the utility of the Document-type memory tag to serve as the event buffer. The JSON object enables aggregation of multiple properties with a single handle, helping avoid synchronization issues in the event logic.
Thanks to Cirrus-Link's Dan Potter & Nathan Davenport and IA's @jrosenkrans for working with us on this issue.