Issues when using system.util.invokeAsynchronous() I get a 'NoneType' object is not callable error

When using system.util.invokeAsynchronous() I keep receiving this error:

Message:

Error running function from fpmi.system.invokeAsynchronous

Details:

TypeError: 'NoneType' object is not callable

at org.python.core.Py.TypeError(Py.java:236)
at org.python.core.PyObject.call(PyObject.java:396)
at com.inductiveautomation.ignition.common.script.ScriptManager.runFunction(ScriptManager.java:847)
at com.inductiveautomation.ignition.client.script.DesignerSystemUtilities.lambda$_invokeAsyncImpl$1(DesignerSystemUtilities.java:140)
at java.base/java.lang.Thread.run(Unknown Source)

Ignition v8.1.32 (b2023091211)
Java: Azul Systems, Inc. 11.0.18

After running this code:

path = "Select Query"
tagPaths = "[Queue]Queue"
sequenced = system.db.runNamedQuery(path)
updateBlockQueue = system.dataset.toDataSet(sequenced)
system.tag.writeBlocking(tagPaths, updateQueue)

tagData = system.tag.readBlocking(tagPaths)[0].value
pyTagData = system.dataset.toPyDataSet(tagData)

updates = []

def updateSize(index, row, updates):
	config = system.db.runNamedQuery("Select Config",{"WO": row[1]})
	engineSize = system.db.runNamedQuery("Select Size", {"configCode": config[0][0]})
	updates.append((index, {"SIZE":Size[0][0]}))
	
def applyUpdates(tagPaths, tagData, update):
	for index, update in updates:
		tagData = system.dataset.updateRow(tagData, index, update)
	system.tag.writeBlocking([tagPaths], [tagData])

def runUpdates():
	a = system.date.now()
	for index, row in enumerate(pyTagData):
		system.util.invokeAsynchronous(updateSize(index, row, updates))

	system.util.invokeAsynchronous(applyUpdates(tagPaths, tagData, update))
	b = system.date.now()
	total = system.date.secondsBetween(a, b)
	print(total)

runUpdates()

This will execute with the result I need, and all of the data is correct, but the error mentioned is still being posted. I do not know why.

I am running this in the Script Console for testing purposes.

All of your threads will run concurrently. system.util.invokeAsynchronous() doesn't wait for that task to complete. (That's the whole point of it.)

That any updates are completed by updateSize() before applyUpdates() accesses the list is pure chance.

Your non-UI operations, including loops through the data, should be in a single function for asynchronous execution.

(You should move this whole task to a project library script.)

Consider also not using system.dataset.updateRow() if you are completely replacing the dataset. Just build a fresh dataset from scratch--it's faster. If you need control over column types, use the DatasetBuilder class instead of system.util.toDataSet().

While everything Phil said is true, you're also just calling invokeAsynchronous fundamentally wrong.

You need to pass a callable reference to your function. What you're currently doing is running your function, blocking the invoking thread, and then returning the result (None) to invokeAsynchronous, which is why it's failing.

You need to use a syntax like this:

system.util.invokeAsynchronous(updateSize, args = [index, row, updates])
4 Likes

This looks wrong to me. I believe it should just be the function name, without the signature. If you need to pass parameters into the function then they should can be done as default parameter values, or using the args list. The NoneType is probably coming from that fact.

Aside from all of the other things that Phil mentioned about this.

EDIT: Ninjad by Paul!!

2 Likes

Hah! I was blinded by the other flaws.

2 Likes

I have done this.
The result is it may not be running updateSize() correctly or at all.
I added print() statements to this function with no result of the print statements running.

I have no errors, yes, but none of the data is updating as it did before.

That doesn't really mean anything:

Post your updated code.

2 Likes

Here is the updated code that will not return errors, but will not result in updating the datset/memory tag.

def runUpdates():
	a = system.date.now()
	for index, row in enumerate(pyTagData):
		system.util.invokeAsynchronous(updateSize, args =[index, row, updates])

	system.util.invokeAsynchronous(applyUpdates, args =[tagPaths, tagData, update])
	b = system.date.now()
	total = system.date.secondsBetween(a, b)
	print(total)

From what I am hearing it seems like I may be using this incorrectly, so this is what I was aiming for and why I started using system.util.invokeAsynchronous().

My end goal is to pull data from a database from different tables and display that data on a table in a perspective view. I originally consolidated all of this to an SQL query, but the joins required to obtain the data increase the execution time to an unreasonable length (upgrading the DB's resources is not optional). By executing parts of the query individually and using ignition to as a "middleman" with a form of multi-threading I was able to reduce this time significantly (more than half).

What is an "unreasonable length"?

I would be shocked if a python script asynchronous or otherwise, is going to be more efficient at building a record set of data than a database engine. That is the express purpose that databases are made to handle.

Also, see Phil's original response. You are definitely not doing this correctly. There should be a single call to invokeAsynchronous. Right now, you are running thread in parallel that that require context from other threads, and relying purely on chance that the context is valid.

Also, to make matters worse you are using the update variable outside of the scope in which it is created. Confusingly, you are sending it to the function in which it is created? I think that the update variable in the function def for applyUpdates should actually be updates? Either way, there are problems in you script.

4+min This data is constantly changing every 73 or so seconds. There are 4 tables with large amounts of data to filter 10mil+ records. I agree with you in that the database should not be slower, this is only my observation and what I have been allowed to use.

I see the flaw that Phil pointed out and I am going to correct that. I suspect since I was using invokeAsynchronous incorrectly, it was showing the data in the end result as expected.

Regarding the update variable, passing update to applyUpdates() was a typo in an older iteration and has since been removed, still netting the same result where no data is being updated.

I will take all of this in consideration and work on revising my code. thank you everyone.

It's a bit awkward, but here's a proof-of-concept of using Java's CompletableFuture API to automatically run multiple operations on an async thread pool (managed by Java in this case), wait for all those operations to complete, and then do some work with all their results on another async thread:

from java.util.concurrent import CompletableFuture
from java.util.function import Consumer, Supplier
from functools import partial
from time import sleep

# Adapts a Python function to act as a Java "Consumer" functional interface
# Works when called directly or as a decorator with @ syntax
def java_consumer(func):
	class PythonConsumer(Consumer):
		def accept(self, t):
			func(t)
	return PythonConsumer()

# Adapts a Python function to act as a Java "Supplier" functional interface
def java_supplier(func):
	class PythonSupplier(Supplier):
		def get(self):
			return func()
	return PythonSupplier()

# Mock update function that just blocks for a fixed delay
def updateSize(row):
	sleep(2)
	return {"SIZE": row}

# Modified applyUpdates function will be guaranteed to get all updates in order, after they have all been completed
def applyUpdates(futures):
	for i, future in enumerate(futures):
		print i, future.get()

def runUpdates():
	# Just some mock data to stick to the general structure of the existing code
	tagData = system.dataset.toDataSet(["a", "b"], [[1, 2], [2, 3], [3, 4]])
	pyTagData = system.dataset.toPyDataSet(tagData)
	
	futures = []
	for row in pyTagData:
		# Some antics here to adapt types and wrap up default arguments
		supplier = java_supplier(partial(updateSize, row))
		# supplyAsync is a static method that will use Java's built in ForkJoinPool thread pool
		futures.append(CompletableFuture.supplyAsync(supplier))

	# Again, some coercion is required to get types happy
	callback = java_consumer(lambda r: applyUpdates(start, futures))

	# allOf returns a new future that's a "composite" of the inputs - so when all of the input futures are done
	# the callback will be invoked, itself on an async thread
	CompletableFuture.allOf(futures).thenAcceptAsync(callback)

runUpdates()
1 Like