How to get the stored data on a custom variable when using Custom Aggregate function

How to I get access to the cnt value used in this blockContext when the query result is returned.

def myCount(qval, interpolated, finished, blockContext, queryContext):
	cnt = blockContext.getOrDefault('cnt',0)
	if qval.quality.isGood():
		blockContext['cnt']=int(cnt)+1
	 
	if finished:
		return blockContext.getOrDefault('cnt', 0)

data=system.tag.queryTagHistory(
	paths=paths,
	aggregationModes =['shared.myCount'],
	startDate=start,
	endDate=end,
	returnSize=1000
	)

The reason I am asking is because I have a Totalizer which I want to use Range Aggregation to get the day total. The problem is , there are zeros in the data which cause the return value to be incorrect.

I want to create a custom aggregation function to remove 0's from the return.

I don't think you can pass your own functions to aggregationMode.
It might be simpler to post-process the un-aggregated results.

1 Like

You can. There are some restrictions though.

https://docs.inductiveautomation.com/display/DOC81/Custom+Tag+History+Aggregates

You would just return the value once finished is true.

If you chose to use a custom aggregate then you have to provide the logic to process the values. You can't combine that with a pre-built aggregate.

def myCount(qval, interpolated, finished, blockContext, queryContext):
	cnt = blockContext.getOrDefault('cnt',0)
	if qval.quality.isGood():
		cnt += 1

	blockContext['cnt'] = cnt
	 
	if finished:
		return cnt

Here is an example of of an aggregate that I use to calculate the duration based on particular values of an integer tag.

def customStatusDuration(qval,interpolated,finished,blockContext,queryContext):
	lastValue = queryContext.get('lastValue',False)
	startTime = blockContext.get('startTime',None)
	duration = blockContext.get('duration',0)
	
	if all([qval.value == 0,not lastValue,qval.quality.good]):
		startTime = qval.timestamp
	elif all([startTime,qval.value,qval.quality.good,not finished]):
		duration += system.date.secondsBetween(startTime,qval.timestamp)
		#clear the start time so that we don't add time in between stops to the
		#duration.
		startTime = None
		
	queryContext['lastValue'] = qval.value == 0
	
	if finished:
		if startTime is not None:
			duration += system.date.secondsBetween(startTime,system.date.fromMillis(blockContext.blockEnd))
		return duration 
		
	blockContext['duration'] = duration
	blockContext['startTime'] = startTime
2 Likes

Then shouldn't the count be stored in the queryContext instead of the blockContext ?

That depends on if the value needs to be persisted across multiple blocks.

For instance, in my example I persist the lastValue across the query, so that the next block knows what the last value of the previous block was, but I don't for the duration or startTime.

Is it possible to remove/skip a block from being return?

Lets say I'm screening for blocks where the value is zero and do not want to have it returned in the final dataset, how would I skip it?

You can't not return from a python function - even if you don't return explicitly, it returns None.
Well, you could not return, by raising an error, but I doubt that helps here.

What exactly do you want your aggregation function to do ?
Or maybe a better question would be: What do you want queryHistory to return ?

I want the aggregation function to remove zero values from the return dataset.

Lets say the tag history is as follows,
For simplicity sake here is an example of a return from a normal queryTagHistory

[
   ["01-01-2023":1],
   ["02-01-2023":2],
   ["03-01-2023":3],
   ["04-01-2023":0],
   ["04-01-2023":4],
   ["05-01-2023":5]
]

This is where I want the aggregation function to remove the zero values from the dataset and return this instead.

[
   ["01-01-2023":1],
   ["02-01-2023":2],
   ["03-01-2023":3],
   ["04-01-2023":4],
   ["05-01-2023":5]
]

But what are the other values ? The exact data points ? You were talking about range and a totalizer, so there must be more going on than just removing zeros ?

Accidently post the reply before I was finished typing.

These would be flow totals, I want to calculate the Flow Total for a specified time range.
Where the Flow total is Latest Flow Total - Earliest Flow Total=Total Flow for that period of time.

The issue is , with large amounts of data I would have to run another loop after retrieving the data from the tag history query to remove the 0's from the dataset. I am hoping to use the custom aggregation function to do this when the data is being retrieved.

Frankly, just filter the returned dataset. This is simpler, clearer, and easier to do.
IF it's not fast enough, THEN turn to custom aggregation.

ds = system.tag.queryHistory(...)

ds = system.dataset.toDataSet(
	list(ds.columnNames),
	filter(lambda x: x['value_column'] != 0, system.dataset.toPyDataSet(ds))
)

I can't see a way to skip a block in the aggregation function, except maybe if you were storing valid block data in the queryContext and returning that at the end, but that feels a bit... weird, and the return of queryHistory would have to be re-processed to get what you really need.

The period of time can be as long as a year , meaning if I can avoid looping through the data multiple times if would be nice.

The quick fix I have implemented, is to check if the value at index -1 and 0 ,if the value at the end of the array is equal to 0, I walk backwards (forwards for index 0) till I find a non-zero value .

What ? Where are you doing this ?
I fail to see how that solves anything...

Here is an example of why I am doing this.

data=[
   ["01-01-2023":0],
   ["02-01-2023":2],
   ["03-01-2023":3],
   ["04-01-2023":0],
   ["04-01-2023":4],
   ["05-01-2023":5]
]

To calculate the Flow Total I would do this

 Total=data[-1][1] - data[0][1] 
 Total = 5 - 1
 Total = 4

Which is wrong.

Using this method I described, this is what I am doing.

data=[
   ["01-01-2023":0],<--check if data[0][1] is 0 , if so move to the next index
   ["02-01-2023":2],
   ["03-01-2023":3],
   ["04-01-2023":0],
   ["04-01-2023":4],
   ["05-01-2023":5] <--check if data[-1][1] is 0 ,if not use this as end value
]
data=[
   ["01-01-2023":0],
   ["02-01-2023":2],<--check if data[0][1] is 0 , if not use this as end value
   ["03-01-2023":3],
   ["04-01-2023":0],
   ["04-01-2023":4],
   ["05-01-2023":5] <--check if data[-1][1] is 0 ,if not use this as end value
]
 Total=endValue - startValue 
 Total = 5 - 2
 Total = 3

Which is correct.

So, what you actually want, is not to remove the 0, but calculate a value that uses the first non-zero value and the last non-zero value ?

This seems like a totally different thing to do in an aggregation function !

Store the first and last non-zero values in the query context, then when you reach the end of the query, return the difference between the 2.

If the data was ""cleaned" and I expect no 0 values at the start or end of the array (assume there are totals for the start and end range) , I don't have to perform an extra step to do this check.

Unfortunately for some reason the totalizer tag has store random 0's in the database.

How do you know if your last value in the query when using custom aggregation?
Is there a array length value and index position provided in the queryContext?

I guess you can use the blockEnd key of the block context and compare it to the queryEnd key of the query context.

Or keep doing what you're doing now, which is honestly not a bad solution.

Sounds to me like you shouldn't be using the tag historian, but transaction groups. Then use SQL lead() or lag() outside a subquery that discards the zeros.