In our kafka tag history sink we send tag history records to kafka like this:
@Override
public void storeData(HistoricalData data) throws IOException {
int pathIndex = 0;
for (HistoricalData row : BasicDataTransaction.class.cast(data).getData()) {
BasicScanclassHistorySet scanset = BasicScanclassHistorySet.class.cast(row); // line 40
if (scanset.size() == 0) continue;
String gatewayName = this.hostName;
String provider = scanset.getProviderName();
pathIndex = provider.length() + 2;
for (HistoricalTagValue tagValue : scanset) {
try{
String json = new JSONObject()
.put("gatewayName", gatewayName)
.put("provider", provider)
.put("tagPath", tagValue.getSource().toString().replace("["+provider+"]", ""))
.put("type", tagValue.getTypeClass())
.put("quality", tagValue.getQuality())
.put("value", String.valueOf(tagValue.getValue()))
.put("epochms", tagValue.getTimestamp().getTime())
.toString();
SinkData value = new SinkData(topic, json, this.getPipelineName());
this.sendDataWithProducer(value);
} catch (JSONException e) {
logger.error("Error sending tag: " + e.toString());
}
}
}
if (pathIndex > 0) {
setLastMessageTime(this.name);
}
}
In that tag history stream though, we sometimes see the following:
com.inductiveautomation.ignition.gateway.history.TagRetirementList
These result in errors and store and forward quarantined records:
I am thinking to add an if statement or try/catch that checks if the record is a tag retirement and if so don't send it to the sink, do nothing in stead (maybe just send a message to the log).
What would be the best way to check if the row is a tag retirement?
Thanks,
Nick