Issues when using system.util.invokeAsynchronous() I get a 'NoneType' object is not callable error

It's a bit awkward, but here's a proof-of-concept of using Java's CompletableFuture API to automatically run multiple operations on an async thread pool (managed by Java in this case), wait for all those operations to complete, and then do some work with all their results on another async thread:

from java.util.concurrent import CompletableFuture
from functools import partial

from java.util.function import Consumer, Supplier

# Adapts a Python function to act as a Java "Consumer" functional interface
# Works when called directly or as a decorator with @ syntax
class java_consumer(Consumer):
	def __init__(self, func):
		self._func = func
	def accept(self, t):
		self._func(t)

# Adapts a Python function to act as a Java "Supplier" functional interface
class java_supplier(Supplier):
	def __init__(self, func):
		self._func = func
	def get(self):
		return self._func()

# Note: For efficiency, the above classes should live in your project library

# Mock update function that just blocks for a fixed delay
# Note: In real code, never import inside a function definition
def updateSize(row):
	from time import sleep
	sleep(2)
	return {"SIZE": row}

# Modified applyUpdates function will be guaranteed to get all updates in order, after they have all been completed
def applyUpdates(futures):
	for i, future in enumerate(futures):
		print i, future.get()

def runUpdates():
	# Just some mock data to stick to the general structure of the existing code
	tagData = system.dataset.toDataSet(["a", "b"], [[1, 2], [2, 3], [3, 4]])
	pyTagData = system.dataset.toPyDataSet(tagData)
	
	futures = []
	for row in pyTagData:
		# Some antics here to adapt types and wrap up default arguments
		supplier = java_supplier(partial(updateSize, row))
		# supplyAsync is a static method that will use Java's built in ForkJoinPool thread pool
		futures.append(CompletableFuture.supplyAsync(supplier))

	# Again, some coercion is required to get types happy
	callback = java_consumer(lambda r: applyUpdates(start, futures))

	# allOf returns a new future that's a "composite" of the inputs - so when all of the input futures are done
	# the callback will be invoked, itself on an async thread
	CompletableFuture.allOf(futures).thenAcceptAsync(callback)

runUpdates()
1 Like