I have a gateway script that I am looping through to update machine schedules for a specific department.
Calling the machine update functions using system.util.invokeAsynchronous is much faster then calling and waiting for the machine functions to run one at a time.
Two issues arise as a consequence to this change:
I was writing to a tag at the beginning of the first machine update and at the end of the last machine update to indicate on the Schedule display that a department is in the process of being updated (also this tag would be useful to preventing an update to a department that is currently being updated)
If there are several dozen machines in a department I am concerned about having too many database queries active or waiting in each of the threads.
So I am looking for a solution to track each thread in my machines loop that can notify me when ALL the threads have completed (1) so that I can update my "updating" tag to False and (2) perhaps indicate how many open DB queries I have in those threads so that I have a chance to have these functions que if I have too many active connections so that I am not blocking some other queries in my project.
Additionally, it would be ideal to not have to use any modules to accomplish these goals so that the solution will update easily when Ignition version is updated.
Consider running it all in one thread, and using a tag to indicate how far it is through the list of queries. The asynchronous thread can write to that tag as it goes.
def refreshMachine(machine,lastUpdate):
query1
query2
query3
query4
query5
dept = myDept
machinesDs = system.db.runPrepQuery('Select machine from table where department = ?', dept)
for i in range(machinesDs.getRowCount()):
machine = machines.Ds.getValueAt(i,'MachineName')
#These calls do not need to wait on any of the other calls
#But I would like to know if any of these are running
system.util.invokeAsynchronous(refreshMachine,[machine,lastUpdate])
if i == 0:
#write to memory tag that ANY machine in this dept is being calculated
if i == (machinesDs.getRowCount() - 1):
#After the last running thread is completed, write that department is done being calculated
Obviously this doesn't work because the last running thread tag write would happen almost immediately because it is still happening on the main thread while the functions are all being pushed off to their own.
But if I put those tag writes in the async function I can't guarantee that the last called finishes last.
What I am thinking about your response...
Inside the function I could do a tag read on an int tag and add +1 at the beginning and then subtract -1 when it completes so that way at the end of the function I could check the tag and if it's 0 then I am done? But I don't know if this would get me into a race condition with the other thread reads and writes. How could I use a global variable to hold that value instead of a tag?
Disclaimer: I'm used to Java land, so these snippets may not work exactly right first try.
If you explicitly want the fanout (without controllability for queue size), you're probably going to have to move beyond system.util.invokeAsynchronous and get to a lower level primitive.
Phil may have other advice, but I'd probably get or instantiate a fixed size thread pool in system.util.globals.
Once you've got a fixed size pool (specifically an ExecutorService) you can dispatch a bunch of work at once, get a bunch of 'futures' back, then block on all those futures.
Totally untested snippet (see disclaimer at the top) that puts it all together below. I would put the pool and CallableWrapper into the project library as a dedicated util, separate from the actual 'do the work' piece.
from java.util.concurrent import Callable, CompletableFuture, Executors
pool_key = "db-pool"
pool = system.util.globals.setdefault(pool_key, Executors.newFixedThreadPool(4))
def CallableWrapper(Callable):
def __init__(self, function, *args, **kwargs):
self.function = function
self.args = args
self.kwargs = kwargs
def call(self):
self.function(*self.args, **self.kwargs)
futures = [
pool.submit(
CallableWrapper(
refreshMachine,
machinesDs.getValueAt(i, 'MachineName'),
lastUpdate
)
)
for i in xrange(machinesDs.rowCount)
]
# at this point, all the work has been dispatched to the pool, but the code will keep running
# so write your 'in progress' tag
# now, if you want to wait until everything is complete, use join to block for a result
# you probably want to do this in an async thread as well
CompletableFuture.allOf(futures).join()
This is the flaw. You are trying to write to one tag from multiple sources. There's no way to make that non-racy. The simplest solution is one status tag per background thread. Then one expression tag that becomes true when all the other tags show finished.
This is exactly what I was thinking too but every department has a different number of machines and I don't want hundreds of tags to manage every time I add or move a machine. Currently I have a dataset for all departments with a boolean 'busy' column I am reading and writing to for each department - perhaps I should add a machine column and then to check if a department is busy I can OR all the busy values for each machine in the department - but I can see this failing too when there aren't any threads running I could get a false 'department done'
So maybe I could keep a blank machine row for each department that I use to turn on when I run my first thread and turn off with the last thread and use that in the OR expression as well to make sure I don't prematurely have the 'done' status. I'm going to see how this approach works out.
Well I ended up looking at a UDT I had for the Ignition machines and added a boolean "updating" tag to them.
I ended up doing what Phil suggested and the results look pretty good.
I think I will create a gateway tag to "count" my active queries but I didn't have any memory or performance issues with 35 concurrent machine threads so that probably isn't necessary.