Java concurrent queue for tag change scripts

Continuing the discussion from [SOLVED] Are tag Value Changed scripts running asynchronously?:

We've got code in our standard library that I need to log a value change on a tag and then associate various data points with that value change. The value is located in a UDT, and because this is part of our "standard" library we use for our customers, the number of tags is variable and quite possibly numerous so having a gateway tag event script isn't feasible for this solution.

I was reading up on the above-linked thread, and @pturmel mentioned using a Java concurrent queue. Does anyone have any pointers where I can start developing a solution for this?

In reading it seems that I would want to use a ConcurrentLinkedQueue, and add a dictionary of the parameters that are then serviced later on a timer event.

Or am I way off base here?

Sounds about right. Not off base.

1 Like

So a slight issue when testing.

This is basically my code:

from java.lang import Throwable,System
from java.util.concurrent import ConcurrentLinkedQueue

thisQueue = ConcurrentLinkedQueue()

def logEmbeddedInterlock(tag,tagPath,previousValue,currentValue,initialChange)
	logger = 'testing'
	dictData = 	{
		'tag':tag,
		'tagPath':tagPath,
		'previousValue':previousValue,
		'currentValue':currentValue,
		'initialChange':initialChange
	}
	success = thisQueue.offer(dictData)
	logger.tracef('%s' % 'Offered {} to thisQueue. Success: {}'.format(dictData,success))
	logger.tracef('%s' % 'thisQueue length after offer: {}'.format(len(thisQueue.toArray())))

def serviceQueue():
	logger = 'testing.serviceQueue'
	logger.tracef('%s' % 'thisQueue length at start of serviceQueue: {}'.format(len(thisQueue.toArray())))
	while thisQueue.peek():
		interlockData = thisQueue.peek()

		tag = interlockData['tag']
		tagPath = interlockData['tagPath']
		previousValue = interlockData['previousValue']
		currentValue = interlockData['currentValue']
		initialChange = interlockData['initialChange']
		if not initialChange:
				logger.tracef('%s' % 'Got interlock data: {}'.format(interlockData))
				#Do things with the data
		else:
			thisQueue.remove(interlockData)	

I'm calling the logEmbeddedInterlock from the Tag Value Changed event.
I know it is firing, because I'm seeing an increase of the number of items in the thisQueue object in the logger.

However, when the timer script is firing off to call serviceQueue(), the number of items is being reported as 0.

The project is setup like so: scripting project -> GUI object project -> HMI Project.

The gateway has a reference to the scripting project for the tags, and the timer script is running in the HMI Project (the only runnable project on the system)

Do I need to be packing the queue object into system.util.getGlobals() when a new object is offered/removed? Or is there something else I'm missing somewhere along the way?

You absolutely must use my globalVarMap() or system.util.getGlobals() to coordinate access to a single ConcurrentLinkedQueue, with careful application of .setdefault to not smash a prior instance.

Also, keep in mind that ConcurrentLinkedQueue is unbounded--if you screw up the draining, you will leak gobs of memory. Consider using LinkedBlockingQueue with a capped size.

2 Likes

If using this and the items must be added to the queue, then we have to use put, right? But this will then block the tag thread pool until it's added the item - bad!. If you use offer to add the item and it's at capacity, it'll just drop the event which is also bad. Is there a solution to that, or should the capacity simply be set reasonably high that all events can be added and processed in time?

This is my example:

# Import Java classes
from java.util.concurrent import LinkedBlockingQueue

max_capacity = 2
system.util.getGlobals().setdefault('queue', LinkedBlockingQueue(max_capacity))

def processQueue():
	queue = system.util.getGlobals()['queue']
	processedItems = []
	while not queue.isEmpty():
		processedItems.append(queue.poll())
		processedItems[-1]() # run the function just pulled from the queue
	
	system.perspective.print("processed items: {}".format(processedItems))


def addItemToQueue(func):
	queue = system.util.getGlobals()['queue']
	queue.put(func)

No, it isn't bad. That's the whole point of "Blocking" in LinkedBlockingQueue. Use the return value to detect the drop, and warn, but the point is to not consume unlimited memory. Compute the maximum quantity you expect in the queue, add a little margin, and go. If you are wrong about keeping up, you get a warning instead of a crash. Always use offer().

If you don't want to ever drop entries, even to the point of crashing your gateway, use a ConcurrentLinkedQueue instead.

2 Likes

Cool, thanks for the info!

This is where I ended up at:

LIBRARY = 'shared.be.tags.changeEvents'
LOGGER = system.util.getLogger('<company>-Scripting-{}'.format(LIBRARY))
from java.util.concurrent import LinkedBlockingQueue
from java.lang import Throwable as JLT
from functools import partial
import traceback

# This library adds functions to add function calls into a queue object to then be processed outside of the calling function.
# This is acutely important when executing functions within tag change events, as these need to execute in single-digit ms.
# To use these functions, call addItemToQueue with the function to call along with its args, inside of the tag change event handler script. In a gateway timer event executed periodically within a dedicated thread, fixed delay, call processQueue() to process the function calls in the queue.

# Create a LinkedBlockingQueue with a maximum capacity (optional)
max_capacity = 200  # You can adjust the capacity as needed. The more capacity, the more memory is required. If this is unbounded, then there is the potential for Ignition to run out of memory if the queue isn't being drained frequently enough.
system.util.getGlobals().setdefault('queue', LinkedBlockingQueue(max_capacity))

def processQueue():
	f = 'processQueue()'
	queue = system.util.getGlobals()['queue']
	while not queue.isEmpty():
		# get the first item (function object) from the queue and execute it
		try:
			queue.poll()()
		except JLT as e:
			LOGGER.error('{}: Error: {}, Cause: {}'.format(f, traceback.format_exc(), e.cause))
		except:
			LOGGER.error('{}: Error: {}'.format(f, traceback.format_exc()))

def addItemToQueue(function, **kwargs):
	f = 'addItemToQueue(...)'
	queue = system.util.getGlobals()['queue']
	if not queue.offer(partial(function, **kwargs)): # adds the item to the queue, if there is room!
		LOGGER.error('{}: Item attempting to be added was dropped due to insuffient queue space!'.format(f))

def printQueue():
	""" function for debugging only """
	f = 'printQueue()'
	queue = system.util.getGlobals()['queue']
	size = queue.size()
	try:
		system.perspective.print('{}: Queue size: {}'.format(f, size))
	except:
		print '{}: Queue size: {}'.format(f, size)
	
	return size

I don't know why I put this off for so long... it's actualy pretty darn simple

Wouldn't you want this to be:

def addItemToQueue(function, *args, **kwargs):
    f = 'addItemToQueue(...)'
    queue = system.util.getGlobals()['queue']
    if not queue.offer(partial(function, *args, **kwargs)):
        etc....

I guess I don't know what happens if you call a function with standard args, but the signature only includes the **kwargs argument, since you haven't provided any other arguments. Seems to me that you wouldn't want to specify that all function calls must use keyword arguments. :man_shrugging:

Also you might want to use java.lang.Throwable in place of Exception, then you'll catch any Java error not just those that extend Exception.

I guess I could accept both, although I do prefer to provide keyword args in case the signature changes (hopefully args are only ever added to the end, but who knows...), and this is a way that I can enforce it. They also make reading the function call far easier though, as you know what's being passed into where.

Hmm, I thought I read somewhere that code didn't need to handle Throwable, but now I can't find it... reading more, it does seem that Throwable would be more apt. I'll change it, cheers

{My Bold}

This is extremely unsafe. Placing code in a persistent global, even indirectly, is begging for a huge memory leak. You are also likely executing the code in one scripting context where it is bound to its original scripting context. Don't do this. Put only native java or jython data in your queues.

Really.

{ The only "queue" you should use that contains arbitrary code should be a ThreadPoolExecutor or similar java service. Those don't have bugs that could leave gobs of memory stranded. }

1 Like

How would you do this then? Save a function name as a string and then call it with some type of reflection? Or abandon the queue in this case in favor for the ThreadPoolExecutor?

Or is the thought, that each potential function that could be called, you would save a dictionary or other data structure of the arguments passed, and then that data is always handled by the same function? In other words you have multiple queues one for each function?

This, if you must. Create your own persistent thread pool instead of the queue. Give it the desired number of threads.

But I prefer to simply make a separate queue for each kind of data, and drain it with a separate timer event that does the right thing. Simplest. Easiest to maintain.

(If you want to do the named function thing, consider my indirectAction() scripting utility in my Integration Toolkit.)

2 Likes

"Two steps forward, one step back" haha

Ok. Well I would prefer to keep it as simple as possible for others to use, so I think i'll go down the ThreadPoolExecutor path.

Code to follow to check I haven't opened another memory wormhole.

(I replaced partial with a lambda)

LIBRARY = 'shared.be.tags.changeEvents'
LOGGER = system.util.getLogger('SAGE-Scripting-{}'.format(LIBRARY))

# This library adds functions to add function calls into a Thread pool object to then be processed outside of the calling function.
# This is acutely important when executing functions within tag change events, as these need to execute in single-digit ms.

from java.util.concurrent import Executors
from java.util.concurrent import ThreadPoolExecutor
from java.lang import Exception as JLE
import traceback

max_threads = 5 # max number of threads to create. Tasks submitted beyond this will be queued in order for execution
system.util.getGlobals().setdefault('threadPool', Executors.newFixedThreadPool(max_threads))


def addFunctionToThreadPool(function, **kwargs):
	f = 'addFunctionToThreadPool(...)'
	threadPool = system.util.getGlobals()['threadPool']
	threadPool.submit(lambda: function(**kwargs))


def printThreads():
	f = 'printThreads()'
	threadPool = system.util.getGlobals()['threadPool']
	count = threadPool.getActiveCount()
	try:
		system.perspective.print('{}: Threads active: {}'.format(f, count))
	except:
		print '{}: Threads active: {}'.format(f, count)
	
	return count

This:

and this:

are contradictory.

Code that does a simple, well-defined task will be the easiest to maintain and for your future self to understand.

In a tag event, push a tuple onto a queue. In a timer event, drain the tuples, and unpack each one. Do work. Simple.

2 Likes

Maybe I'm missing something, but isn't that what this is?
I have a set of arbitrary functions defined in script libraries that I need to call from a tag change event handler, but I can't run them directly since they execute for longer than 10ms. So I add the functions into a ThreadPoolExecutor and it executes them for me. All I have to do is add the functions to the thread pool, that's it. I don't need to create any Timer events.

This requires creating timer events, or adding lines to a single timer event, for each new type of function you are calling though. Isn't this more work and more difficult to understand, easier to miss, for someone coming onto the project?

(I'm definitely green to this stuff and trying to understand it!)

But you will now have the same phenomenon repeated in your own architecture: a small number of invocations of one long-running function can block many other operations. Use a thread pool to accelerate the processing of a single function when one thread (a timer thread) isn't enough.

One tag event payload shape => one queue => one timer event => one thread => one function => can't disturb other queues.

Anyone can follow that flow and make a new one. And the new one won't break everything else if that "anyone" screws up.

3 Likes

Ok, that makes sense. The phemenon will still be repeated with the timer event though won't it, if the function calls execute for sufficiently long and/or the number of events added are sufficiently many to prevent the queue from catching up processing its contents?

See second sentence of first paragraph of my prior comment.

Or make two (or more) timer events that drain from the same queue.

Also, thread pool executors don't give you much control over memory consumption in your backlog like a BlockingQueue does. And when you do bog a queue down, you haven't broken any other queues, nor have you broken the Ignition tag system.

1 Like

Yes, so for example if I have 1000 tags all with the same change event script which all add a tuple to a single queue object. One timer event is added to drain the queue, unpack the tuples, execute the function needed with the arguments supplied from the tuple (side note: can I supply a dict instead?). Done. Timer event is set to dedicated thread, execute on delay of 1s.
Extreme example: say the function takes on average 1s to execute and each tag adds a tuple to the queue every second as well, the timer event will never be able to process all items and keep up.... (just read your update)

Ok, I think this answers the above :slight_smile:

So if a thread pool bogs down (tasks take a long time to execute and many are queued), does this affect other threads as well?