TagRetirementList vs. BasicScanclassHistorySet

In our kafka tag history sink we send tag history records to kafka like this:

    @Override
    public void storeData(HistoricalData data) throws IOException {
        int pathIndex = 0;

        for (HistoricalData row : BasicDataTransaction.class.cast(data).getData()) {
            BasicScanclassHistorySet scanset = BasicScanclassHistorySet.class.cast(row); // line 40
            if (scanset.size() == 0) continue;

            String gatewayName = this.hostName;
            String provider = scanset.getProviderName();
            pathIndex = provider.length() + 2;

                for (HistoricalTagValue tagValue : scanset) {
                    try{
                        String json = new JSONObject()
                                .put("gatewayName", gatewayName)
                                .put("provider", provider)
                                .put("tagPath", tagValue.getSource().toString().replace("["+provider+"]", ""))
                                .put("type", tagValue.getTypeClass())
                                .put("quality", tagValue.getQuality())
                                .put("value", String.valueOf(tagValue.getValue()))
                                .put("epochms", tagValue.getTimestamp().getTime())
                                .toString();

                        SinkData value = new SinkData(topic, json, this.getPipelineName());
                        this.sendDataWithProducer(value);

                    } catch (JSONException e) {
                        logger.error("Error sending tag: " +  e.toString());
                    }
                }
        }
        if (pathIndex > 0) {
            setLastMessageTime(this.name);
        }
    }

In that tag history stream though, we sometimes see the following:

com.inductiveautomation.ignition.gateway.history.TagRetirementList

These result in errors and store and forward quarantined records:

I am thinking to add an if statement or try/catch that checks if the record is a tag retirement and if so don't send it to the sink, do nothing in stead (maybe just send a message to the log).

What would be the best way to check if the row is a tag retirement?

Thanks,

Nick

instanceof? I don't think there's any real API for it. You'll just have to check the type at runtime before making assertions with a cast.

10-4, I'll give it a go and if I get something that works post it back here in case it works.

I doubt many people are looking at this topic but you never know who might benefit later on....

Thanks,

Nick

2 Likes

I am interested in building a module to add a new kind of History. All post around this topic are very valuable for me !
Thanks a lot.

Here is how our Kafka module works (at least the tag history portion, there is also alarms and audit).

To test if the change I made today works or not, I just make a new tag and set the storage provider. Then any tag changes are sent to Kafka:

Then if I go look in the Kafka broker I can see that the data is there. So that means with the new class filter applied, the records that should be sent down the pipeline are actually able to make it. That is good.

Now comes the new part. I am expecting that when I delete that tag, a tag retirement message attempts to be sent but is prevented from being able. A message should show up in the logs.

Before that I just check that at this point the S&F quarantine for the tag history sink is empty, which it is:

After deleting the tag, a message does show up in the logs. I supposed it could be useful to see more details so I may add additional log information, but this is sufficient to show that the instanceof method added will prevent unwanted records from filling up the S&F quarantine:

Here is the modified code:

@Override
public void storeData(HistoricalData data) throws IOException {
    int pathIndex = 0;

    for (HistoricalData row : BasicDataTransaction.class.cast(data).getData()) {
        // prevent TagRetirementList from going to store and forward quarantine
        if (row instanceof BasicScanclassHistorySet) {
            BasicScanclassHistorySet scanset = BasicScanclassHistorySet.class.cast(row);
            if (scanset.size() == 0) continue;

            String gatewayName = this.hostName;
            String provider = scanset.getProviderName();
            pathIndex = provider.length() + 2;

                for (HistoricalTagValue tagValue : scanset) {
                    try{
                        String json = new JSONObject()
                                .put("gatewayName", gatewayName)
                                .put("provider", provider)
                                .put("tagPath", tagValue.getSource().toString().replace("["+provider+"]", ""))
                                .put("type", tagValue.getTypeClass())
                                .put("quality", tagValue.getQuality())
                                .put("value", String.valueOf(tagValue.getValue()))
                                .put("epochms", tagValue.getTimestamp().getTime())
                                .toString();

                        SinkData value = new SinkData(topic, json, this.getPipelineName());
                        this.sendDataWithProducer(value);

                    } catch (JSONException e) {
                        logger.error("Error sending tag: " +  e.toString());
                    }
                }
            } else {
                logger.info("Not storing data with the following class: " + row.getClass().toString());
            };
        }
    if (pathIndex > 0) {
        setLastMessageTime(this.name);
    }
}

@PGriffith thanks for your help as always.

Rgds,

Nick