Trouble with httpClient .getAsync Callbacks

Hey guys,

I am trying to set up some asynchronous API calls on my gateway but I am having trouble getting my callback function to run properly.

I have tried a few different things, but have been coming up short.
I am trying to get each call of the callback function to write the resulting data into their respective tags that are living on my gateway. It feels as though the callback function isn't running at all, and it is hard to confirm that as I can't get loggers, prints, or anything else of the sort to run in the callback function, so I get no errors.

Within my callback function, I am also attempting to use payload data that exists in my for loop that is creating my asynchronous promise objects. I am not super familiar with python closures but as far as I know, this is how I should expect them to work.

Does anything in my code stick out to anyone that I am doing wrong? Or any advice on the correct way to debug callback functions like these in ignition?

def getCallback(response, error):
	rawData = {}
	for data in response.json['data']:
		for key in data:
			try:
				rawData[data[key]['name']] = data[key]['value']
			except:
				continue
	values = []
	paths = []
	for tag in system.tag.browse(payload['tagType']['fullPath'], filter = {'tagType':'AtomicTag'}).getResults():
		try:
			values.append(rawData[tag['name']])
		except:
			values.append(None)
		paths.append(str(tag['fullPath']))
		
	system.tag.writeBlocking(paths, values)
	return
	
def updateTags():
	basePath = '[default]TankLogix API Data/'
	token = system.tag.readBlocking(['[default]TankLogix API Data/Access Token'])
			
	headers = {
		'Authorization' : 'Bearer ' + token[0].value
	}
	client = system.net.httpClient()
	
	for Site in system.tag.browse(basePath, filter = {'tagType':'Folder'}).getResults():
		SiteID = system.tag.readBlocking([str(Site['fullPath']) + '/SiteID'])[0].value
		for TagType in system.tag.browse(Site['fullPath'], filter = {'tagType':'Folder'}).getResults():
			url = 'https://api.tanklogix.com/api/v2_1_0_0/SiteObjects/' + str(SiteID) + '/' + str(TagType['name'])
			p = client.getAsync(url = url, headers = headers)
			payload = {'tagType':TagType, 'siteID':SiteID}
			p.whenComplete(getCallback)

Are you 100% sure it's not running at all? I would add some loggers to start, at least one in the beginning of the function saying that the function is starting.

It is possible the values you are getting from the http are identical to the current values and then it wouldn't look like anything is happening right?

1 Like
def getCallback(response, error):
	logger = system.util.getLogger("callbackFuntion")
	logger.info("Callback starting")
	
	...
	
	logger.info("Callback ending")
	return
	
def updateTags():
	logger = system.util.getLogger("updateTagsFuntion")
	logger.info("updateTags function starting")
	
	...
			
	logger.info("updateTags function ending")

I'm also sure that the endpoint value is in fact changing because I can look at that data with a separate console as well.

Is this loop producing any rows of data? I would put one in here to see that something is going on. I would probably put one log right now at each level of your loops just to see / be sure.

1 Like

I made a mistake, I didn't wait sufficiently long enough for the .whenComplete() to trigger and I turned the live logs off. doh

I guess my main issue is I don't know how closures work...

Closures capture variables for a function's use at the execution of the def, not at the point of the call. So no, your loop variables won't be available within your callback as shown.

Consider using lambda's within the .whenComplete() method to assemble the parts you need.

1 Like

You have payload but this not defined anywhere before hand or fed as a parameter.

1 Like

main issue is I can't figure out how to feed it as a parameter into this callback function. the response and error parameters are passed by default with the whenComplete function, but not sure how to get the payload to go with it.

Maybe something like this:

def callback(response, error, payload):
    # ...

def update():
    # ...

    payload = {}

    def inner_cb(response, error):
        callback(response, error, payload)

    promise = ...
    promise.whenComplete(inner_cb)

You can probably get fancier with lambda but this demonstrates the basic idea...

5 Likes

Would this be the equivalent for a lambda function?
image

3 Likes

You could also use functools.partial:

from functools import partial

def callback(response, error, payload):
    # ...

def update():
    # ...

    payload = {}

    inner_callback = partial(callback, payload=payload)

    promise = ...
    promise.whenComplete(inner_callback)

Lots of ways to skin this cat.

1 Like

New issue here:

The callback function is only getting whatever the final payload is, instead of getting the correct payload for each call.

I'm assuming that the p.whenComplete(inner_cb) is pulling the parameters at the time it runs the callback instead of at the time it runs the whenComplete method.

Can you show the full script at this point? So we can make sure it's not just a scripting logic error.

1 Like

Logs (made some changes to what the payload carries):

Code:

def getCallback(response, error, tagTypePath):
	logger = system.util.getLogger("callbackFunction")
	logger.info(tagTypePath)
	rawData = {}
	for data in response.json['data']:
		for key in data:
			try:
				rawData[data[key]['name']] = data[key]['value']
			except:
				continue
	values = []
	paths = []
	for tag in system.tag.browse(tagTypePath, filter = {'tagType':'AtomicTag'}).getResults():
		try:
			values.append(rawData[tag['name']])
		except:
			values.append(None)
		paths.append(str(tag['fullPath']))
		
	system.tag.writeBlocking(paths, values)
	return
	
def updateTags():
	logger = system.util.getLogger("updateTagsFunction")
	
	basePath = '[default]TankLogix API Data/'
	token = system.tag.readBlocking(['[default]TankLogix API Data/Access Token'])
			
	headers = {
		'Authorization' : 'Bearer ' + token[0].value
	}
	client = system.net.httpClient()
	
	for Site in system.tag.browse(basePath, filter = {'tagType':'Folder'}).getResults():
		SiteID = system.tag.readBlocking([str(Site['fullPath']) + '/SiteID'])[0].value
		for TagType in system.tag.browse(Site['fullPath'], filter = {'tagType':'Folder'}).getResults():
			url = 'https://api.tanklogix.com/api/v2_1_0_0/SiteObjects/' + str(SiteID) + '/' + str(TagType['name'])
			p = client.getAsync(url = url, headers = headers)
			tagTypePath = str(TagType['fullPath'])
			logger.info(tagTypePath)
			inner_cb = lambda response, error: getCallback(response, error, tagTypePath)
			p.whenComplete(inner_cb)

You could also use functools.partial

This seems to be the only way for me to skin this cat so far...

I've successfully gotten the appropriate payload data for each call of the function with this method.


This is confirmed with live data now successfully being written to my tags.

I'm not sure what is up with using functions defined inside of the for loop, but as long as this works I'm willing to let that be someone else's mystery lol.

Now that I've successfully gotten my asynchronous calls to actually run, I now have a related issue that caused me to work on this in the first place (there is actually a support ticket for this problem if any of you IA folks are interested #96751). I am having an issue with bounded sockets being allocated with each creation of a promise object, and they are not being released once the .whenComplete() method is run. Below is a chart of the count of bounded TCP sockets from running netstat -aonq


This eventually causes our server to exhaust all of its available ephemeral ports and starts causing all sorts of hard-to-diagnose issues.

Is there some method I am supposed to call to explicitly close these sockets? Or is that behavior that should be expected of the java service? Unless I find a way to close these sockets without a complete restart of the service/server, I can not use these asynchronous methods for httpClient.

Would be super interested to hear if anyone has any insight on this.

Are you instantiating a new httpClient for each call? if so, don't. Treat a single instance as a session, storing it in project library top-level variable so you can re-use it.

1 Like

Not for each http call, but yes for each updateTags() call, which is about 1 per minute. I am seeing about 8-9 new sockets per minute which lines up much better with the number of .getAsync() calls I am making.

I will try to instantiate the httpClient globally and see if that helps.

It would also be worth updating to 8.1.33, if you're not already there. We moved the platform to Java 17; there's specifically bug fixes for the builtin Java HTTP client that backs system.net.httpClient.

And definitely avoid the creation of new system.net.httpClient instances as much as possible.

Unfortunately, I have been on 8.1.33 for the last week now. I guess the specific bug fixes didn't have to do with closing forgotten httpClient's.

Is there a method in ignition where I would even be able to explicitly destroy a httpClient object? Or is the expected behavior for the client objects to clean themselves up at the end of a function scope?

No. In 8.3.0 we're moving to Java 21, where it will be possible to explicitly close the underlying Java client we're creating for you, but not until then; it's a new feature of the Java client that there are apparently no plans to backport.

1 Like