Issues when using system.util.invokeAsynchronous() I get a 'NoneType' object is not callable error

It's a bit awkward, but here's a proof-of-concept of using Java's CompletableFuture API to automatically run multiple operations on an async thread pool (managed by Java in this case), wait for all those operations to complete, and then do some work with all their results on another async thread:

from java.util.concurrent import CompletableFuture
from java.util.function import Consumer, Supplier
from functools import partial
from time import sleep

# Adapts a Python function to act as a Java "Consumer" functional interface
# Works when called directly or as a decorator with @ syntax
def java_consumer(func):
	class PythonConsumer(Consumer):
		def accept(self, t):
			func(t)
	return PythonConsumer()

# Adapts a Python function to act as a Java "Supplier" functional interface
def java_supplier(func):
	class PythonSupplier(Supplier):
		def get(self):
			return func()
	return PythonSupplier()

# Mock update function that just blocks for a fixed delay
def updateSize(row):
	sleep(2)
	return {"SIZE": row}

# Modified applyUpdates function will be guaranteed to get all updates in order, after they have all been completed
def applyUpdates(futures):
	for i, future in enumerate(futures):
		print i, future.get()

def runUpdates():
	# Just some mock data to stick to the general structure of the existing code
	tagData = system.dataset.toDataSet(["a", "b"], [[1, 2], [2, 3], [3, 4]])
	pyTagData = system.dataset.toPyDataSet(tagData)
	
	futures = []
	for row in pyTagData:
		# Some antics here to adapt types and wrap up default arguments
		supplier = java_supplier(partial(updateSize, row))
		# supplyAsync is a static method that will use Java's built in ForkJoinPool thread pool
		futures.append(CompletableFuture.supplyAsync(supplier))

	# Again, some coercion is required to get types happy
	callback = java_consumer(lambda r: applyUpdates(start, futures))

	# allOf returns a new future that's a "composite" of the inputs - so when all of the input futures are done
	# the callback will be invoked, itself on an async thread
	CompletableFuture.allOf(futures).thenAcceptAsync(callback)

runUpdates()
1 Like