Java concurrent queue for tag change scripts

Usually not, if the time is spent waiting on a database or a web API or some other I/O-ish activity. If you are burning CPU with some super-complex in-memory algorithm, then total gateway CPU usage will throttle everyone.

1 Like

I'm not sure what you meant by the first comment then? If a thread pool doesn't affect other thread pools (ordinarily), then bogging down a queue or a thread pool both have no effect on the rest of the system (aside from indirectly with the resources they consume)

Edit: also not sure if you saw this question here:

(Sorry to be annoying!

philBrain = ConcurrentLinkedQueue()
while not philBrain.isEmpty():
  philBrain.poll()

)

1 Like

Strictly speaking, you'd use .poll() on the queue in your loop, exiting the event when None, or then processing immediately. This preserves ordering when multiple threads are draining cooperatively.

Choose an event delay time that represents a compromise between latency limitation and deliberate "cool-off" between batches. Also consider limiting the number of entries processed per timer call (100-ish) to ensure some "cool-off" occurs.

Sure. Just don't put code in them, either.

1 Like

Except that a simple queue cannot be abused by injecting other workloads, where any executor you create will happily accept any Runnable. (Which is what functions and lambdas are coerced to, under the jython hood.)

2 Likes

I don't recommend infinite loops.... :crazy_face:

8 Likes

If two timer scripts are running draining the same queue and processing the items drained afterwards, won't this mean that item order processing won't necessarily be preserved?

Item1-10
run simultaneously
T1 drains the queue and collects Item1,2,4,7,9,10
T2 drains the queue and collects Item3,5,6,8

T1 processes its collected items in order 1,2,4,7,9,10
T2 processes its collected items in order 3,5,6,8

e.g.
T1 processes Item1
T2 processes Item3
T1 processes Item2
T1 processes Item4
T2 processes Item5
... etc.

However, if each item is processed as it is drained (.poll()'d), then order should be preserved?

1 Like

Right, the timer event should call a function drain() that looks like this:

tqmvm = system.util.globalVarMap('tagQueueManagement')
_queue = tqmvm.setdefault('thisQueue', LinkedBlockingQueue(100))

def someProcess(entry):
	pass

def drain():
	processed = 0
	entry = _queue.poll()
	while entry:
		try:
			someProcess(entry)
		except Throwable, t:
			...
		except Exception, e:
			...
		processed += 1
		if processed > 100:
			break
		entry = _queue.poll()

Multiple timer events can all call drain() safely, and someProcess() will be very close to ordered. (Modulo thread-switching jitter.)

2 Likes

Playing devil's advocate here perhaps... but, for my own understanding, what if I was to create a ThreadPoolExecutor in getGlobals() for each tag change script "shape" and only add tasks to these from those tag change script? That way each "shape" would have its own thread pool and hence wouldn't impact other "shapes".

Then this:

is still maintained, right? more like this though:
One tag event payload shape => one thread pool => one function => can't disturb other things

You'll be relying on your future self and future colleagues to not pollute that executor. There's no enforcement of the "one function" stage in that flow. Also, a thread pool executor is built on a BlockingQueue, and has a bunch more overhead, and takes a bunch more time to submit. And you are queuing code instead of data.

Just use a queue and timer. Really. KISS.

3 Likes

I got busy with other things but back on this now. My end user unfortunately doesn't want to use 3rd party modules, so I'm using system.util.getGlobals instead of your function.

system.util.getGlobals().setdefault('tagQueueManagement', {})
system.util.getGlobals()['tagQueueManagement'].setdefault('downtimeEvents_AddEvent', LinkedBlockingQueue(100))
system.util.getGlobals()['tagQueueManagement'].setdefault('downtimeEvents_UpdateEvent', LinkedBlockingQueue(100))

def addItemToQueue(queueName, **kwargs):
	f = 'addItemToQueue(...)'
	queue = system.util.getGlobals()['tagQueueManagement'][queueName]
	if not queue.offer(kwargs):
		LOGGER.error('{}: Attempted to add a new function args dict to queue but it was dropped due to insufficient queue capacity!'.format(f))

def processQueue(queueName):
	f = 'processQueue(...)'
	queue = system.util.getGlobals()['tagQueueManagement'][queueName]
	processedCount = 0
	fnKwargs = queue.poll()
	while fnKwargs:
		try:
			# call the function associated with the queueName with the kwargs from the queue item
			if queueName == 'downtimeEvents_AddEvent':
				shared.be.alarms.recordDowntimeEvent(**fnKwargs)
			if queueName == 'downtimeEvents_UpdateEvent':
				shared.be.alarms.updateDowntimeEvent(**fnKwargs)
		except JLE as e:
			LOGGER.error('{}: {}, Cause: {}'.format(f, traceback.format_exc(), e.cause))
		except:
			LOGGER.error('{}: Error: {}'.format(f, traceback.format_exc()))
		
		processedCount += 1
		if processedCount > 100:
			break
		fnKwargs = queue.poll()

I've configured two timer events, each calling the processQueue function for a different queue.

Any issues with this? :grimacing:

Is there any reason you aren't assigning the queues to top-level variables in your script? (The return value from .setdefault(key, value) is like using .get(key, value).) That would be much more performant than hitting system.util.getGlobals() all the time.

(BTW, you should pass the top level Throwable to loggers, not .cause, as you are clipping part of the traceback, possibly all of the traceback. You should also not use jython's string formatting--use the .errorf(), .warnf(), .infof(), .debugf() or .tracef() methods with native formatting. Much less overhead, especially when log levels are off.)

2 Likes

Note, no reason, but I'll do that instead!

Noted RE the loggers, I'll change it. The reason I add the cause is because I've never seen a Throwable thrown that wasn't a sql query error, and the cause is the most useful part but I don't remember seeing it reported in the traceback

They are. The whole chain of them, if multiple.

1 Like

I don't much like string comparisons in the fast path. Consider abstracting the looping and error handling into a decorator, such that a decorated "drain" stub function just calls the right function directly, and is automatically wrapped with the dequeuing loop. (You pass the queue and a log prefix to the decorator, not to the stub.) I would prefer that such a decorator be agnostic to the type of entry in the queue, so that it doesn't care if a particular queue uses a dictionary or a tuple or something else.

Edit: Hmm. Perhaps I should publish a reference implementation...

4 Likes

I'm all over this thread, full of ideas about a first party implementation of message queueing/looping/whatever.

Though, actually, some of what's being talked about here might be possible to do in a more visible way via the new event streams feature. Any scripting source can push events into a stream, and then you can run whatever scripted handling you want. I'm not sure how the threading and the like works yet, though; I'll have to dig into that more.

8 Likes

Yep, it's been very interesting and educational.

This is a problem I’ve had to solve in the past, too. Having a first party solution would be fantastic.

Okay. Draft for your edification: queueUtil.py

Includes Google docstrings.

Utterly untested. :grimacing: Maybe later.

Edit: docstring typos fixed and ambiguities addressed.

Heh: if you delete all comments, docstrings, and blank lines, it is 29 lines of code.

6 Likes

Is this going to outperform pushing a known shape object to a LinkedBlockingQueue and pulling to execute on a dedicated function? (I doubt it, but wouldn't mind being surprised.)

1 Like

I would probably still expect a hand rolled scripting implementation to win on performance - less overhead. I could definitely be wrong, though - I haven't looked into the backend code for event streams at all yet.

1 Like