I recently rewrote my tag compare script which was a bit dopey, to use a Python library littletree
I found via StackOverflow. Most of the kudos goes to its developer who answered my SO question with the majority of the code, but I modified it to include more detail and spit out a CSV.
This will only run in CPython, I use PyCharm as my IDE. It requires a min of Python 3.10 (dependency of the littletree
library).
The only library dependency not included in the standard libraries is littletree
(pip install littletree
)
from littletree import Node # littletree requires minimum Python 3.10
import json
def compare_tag_json(json_orig_filepath, json_new_filepath, exclude_tags_in_removed_folders=True, exclude_tags_in_added_udt_instances=True):
"""
This is a wrapper for function compare_tag_obj.
Takes two tag JSON exports to files and produces a report of changes between the tag structures, including:
- additions/deletions of tags
- additions/modifications/removal of tag props
Args:
json_orig_filepath:
The filepath to the original tag json file for the comparison.
json_new_filepath:
The filepath to the new tag json file for the comparison.
exclude_tags_in_removed_folders:
For folders removed in the new file, this option will exclude the tag changes (i.e. deletions) to the tags
contained within the folder.
Default: True
exclude_tags_in_added_udt_instances:
For UDT Instances added in the new file, this option will exclude the tag changes (i.e. additions) to the
tags contained within the UDT Instance.
Default: True
Returns:
The table of changes in CSV format with headers: tagpath, tagtype, change, from, to
Also prints the results as a CSV to the console.
"""
with open(json_orig_filepath, 'r') as f:
json_orig = f.read()
with open(json_new_filepath, 'r') as f:
json_new = f.read()
original = json.loads(json_orig)
modified = json.loads(json_new)
return compare_tag_obj(original, modified, exclude_tags_in_removed_folders, exclude_tags_in_added_udt_instances)
def compare_tag_obj(tags_original, tags_modified, exclude_tags_in_removed_folders=True, exclude_tags_in_added_udt_instances=True):
"""
Takes two tag JSON as Py lists/dicts and produces a report of changes between the tag structures, including:
- additions/deletions of tags
- additions/modifications/removal of tag props
Args:
tags_original:
The list/dict structure of the original tag json file for the comparison.
tags_modified:
The list/dict structure of the new tag json file for the comparison.
exclude_tags_in_removed_folders:
For folders removed in the new file, this option will exclude the tag changes (i.e. deletions) to the tags
contained within the folder.
Default: True
exclude_tags_in_added_udt_instances:
For UDT Instances added in the new file, this option will exclude the tag changes (i.e. additions) to the
tags contained within the UDT Instance.
Default: True
Returns:
The table of changes in CSV format with headers: tagpath, tagtype, change, from, to
Also prints the results as a CSV to the console.
"""
original_tree = Node.from_dict(tags_original, identifier_name='name', children_name='tags')
modified_tree = Node.from_dict(tags_modified, identifier_name='name', children_name='tags')
# Collect changes in a list
changes = []
for diff_node in original_tree.compare(modified_tree).iter_tree():
diff_data = diff_node.data
tagpath = str(diff_node.path)[1:]
if not diff_data:
continue # Data was the same
if 'self' not in diff_data:
if not exclude_tags_in_added_udt_instances or not any(change['tagpath'] in tagpath and change['change'] == 'Tag added' and change['tagtype'] == 'UdtInstance' for change in changes):
changes.append({'tagpath': tagpath,
'tagtype': diff_data['other']['tagType'],
'change': 'Tag added',
'from': None,
'to': None})
elif 'other' not in diff_data:
if not exclude_tags_in_removed_folders or not any(change['tagpath'] in tagpath and change['change'] == 'Tag removed' for change in changes):
changes.append({'tagpath': tagpath,
'tagtype': diff_data['self']['tagType'],
'change': 'Tag removed',
'from': None,
'to': None})
else:
original_data, modified_data = diff_data['self'], diff_data['other']
all_keys = set(list(original_data.keys()) + list(modified_data.keys()))
for key in all_keys:
if key in original_data and key in modified_data:
if original_data[key] != modified_data[key]:
changes.append({'tagpath': f'{tagpath}.{key}',
'tagtype': original_data['tagType'],
'change': 'Modified property',
'from': original_data[key],
'to': modified_data[key]
})
elif key in original_data:
changes.append({'tagpath': f'{tagpath}.{key}',
'tagtype': original_data['tagType'],
'change': 'Removed property',
'from': original_data[key],
'to': None
})
elif key in modified_data:
changes.append({'tagpath': f'{tagpath}.{key}',
'tagtype': modified_data['tagType'],
'change': 'Added property',
'from': None,
'to': modified_data[key]})
changes_csv = '"' + '","'.join(changes[0].keys()) + '"\r\n'
changes_csv += '\r\n'.join('"' + '","'.join(map(str, change.values())) + '"' for change in changes)
print(changes_csv)
return changes_csv
The result is a table like this:
tagpath |
tagtype |
change |
from |
to |
A/New Tag 1.value |
AtomicTag |
Added property |
None |
1 |
A/New Instance |
UdtInstance |
Tag removed |
None |
None |
A/New Tag 2 |
AtomicTag |
Tag removed |
None |
None |
A/New Tag.value |
AtomicTag |
Modified property |
2 |
1 |
A/Folder B |
Folder |
Tag added |
None |
None |
A/New Instance 3 |
UdtInstance |
Tag added |
None |
None |
or this:
tagpath |
tagtype |
change |
from |
to |
Analogue Input 01.parameters |
UdtType |
Modified property |
{'Alarm_ParentDevice': {'dataType': 'String'}, 'Unit': {'dataType': 'String', 'value': ''}, 'Alarm_Area': {'dataType': 'String'}, 'Format': {'dataType': 'String', 'value': '#,##0.0'}, 'PLCName': {'dataType': 'String', 'value': {'bindType': 'parameter', 'binding': '{PLCName}'}}, 'Description': {'dataType': 'String', 'value': ''}, 'DeviceName': {'dataType': 'String', 'value': ''}, 'Global.': {'dataType': 'String', 'value': ''}} |
{'Alarm_ParentDevice': {'dataType': 'String'}, 'Unit': {'dataType': 'String'}, 'Alarm_Area': {'dataType': 'String'}, 'Format': {'dataType': 'String', 'value': '#,##0.0'}, 'PLCName': {'dataType': 'String', 'value': {'bindType': 'parameter', 'binding': '{PLCName}'}}, 'Description': {'dataType': 'String', 'value': ''}, 'DeviceName': {'dataType': 'String', 'value': ''}, 'Global.': {'dataType': 'String', 'value': ''}} |
Analogue Input 01/Low Alarm SP |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Alarms/High |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Alarms/_AnyActive.executionMode |
AtomicTag |
Removed property |
EventDriven |
None |
Analogue Input 01/Alarms/_AnyActive.expression |
AtomicTag |
Removed property |
{[.]High High} |
|
Analogue Input 01/Alarms/_AnyActive.opcItemPath |
AtomicTag |
Modified property |
{'bindType': 'parameter', 'binding': 'ns=1;s=[{PLCName}]{Global.}{DeviceName}.Alarms'} |
{'bindType': 'parameter', 'binding': 'ns=1;s=[{PLCName}]{Global.}{DeviceName}.AnyAlarm'} |
Analogue Input 01/Alarms/Low Low |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Alarms/Low |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Alarms/Comms Fault |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Alarms/Signal Fault |
AtomicTag |
Tag added |
None |
None |
Analogue Input 01/Alarms/Over or Under Range |
AtomicTag |
Tag added |
None |
None |
Analogue Input 01/Simulation |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/High Alarm SP |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Low Low Alarm SP |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Low Alarm Time |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/High High Alarm Time |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/High Alarm Time |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/Low Low Alarm Time |
AtomicTag |
Tag removed |
None |
None |
Analogue Input 01/High High Alarm SP |
AtomicTag |
Tag removed |
None |
None |
Test Code
original = {
'name': 'A',
'tagType': 'Folder',
'tags': [
{
'valueSource': 'memory',
'name': 'New Tag 1',
'tagType': 'AtomicTag'
},
{
'name': 'New Instance',
'typeId': 'Alarm Summary',
'parameters': {
'MainTagPath': {
'dataType': 'String',
'value': 'Bob'
},
'Description': {
'dataType': 'String',
'value': 'Descrp'
}
},
'tagType': 'UdtInstance',
'tags': [
{
'name': 'Area PLCs for Alarm Reset',
'tagType': 'AtomicTag'
},
{
'name': 'Active Ack',
'tagType': 'AtomicTag'
},
{
'name': 'Active Unack',
'tagType': 'AtomicTag'
},
{
'name': 'Clear Unack',
'tagType': 'AtomicTag'
}
]
},
{
'valueSource': 'memory',
'name': 'New Tag 2',
'tagType': 'AtomicTag'
},
{
'name': 'New Instance 1',
'typeId': 'Alarm Summary',
'parameters': {
'MainTagPath': {
'dataType': 'String',
'value': 'Bob'
},
'Description': {
'dataType': 'String',
'value': 'Descrp'
}
},
'tagType': 'UdtInstance',
'tags': [
{
'name': 'Active Ack',
'tagType': 'AtomicTag'
},
{
'name': 'Area PLCs for Alarm Reset',
'tagType': 'AtomicTag'
},
{
'name': 'Clear Unack',
'tagType': 'AtomicTag'
},
{
'name': 'Active Unack',
'tagType': 'AtomicTag'
}
]
},
{
'valueSource': 'memory',
'name': 'New Tag',
'value': 2,
'tagType': 'AtomicTag'
}
]
}
modified = {
'name': 'A',
'tagType': 'Folder',
'tags': [
{'name': 'Folder B',
'tagType': 'Folder'
},
{
'valueSource': 'memory',
'name': 'New Tag',
'value': 1,
'tagType': 'AtomicTag'
},
{
'valueSource': 'memory',
'name': 'New Tag 1',
'value': 1,
'tagType': 'AtomicTag'
},
{
'name': 'New Instance 3',
'typeId': 'Alarm Summary',
'parameters': {
'MainTagPath': {
'dataType': 'String',
'value': 'Bob'
},
'Description': {
'dataType': 'String',
'value': 'Descrp'
}
},
'tagType': 'UdtInstance',
'tags': [
{
'name': 'Area PLCs for Alarm Reset',
'tagType': 'AtomicTag'
},
{
'name': 'Clear Unack',
'tagType': 'AtomicTag'
},
{
'name': 'Active Unack',
'tagType': 'AtomicTag'
},
{
'name': 'Active Ack',
'tagType': 'AtomicTag'
}
]
}
]
}
compare_tag_obj(original, modified, exclude_tags_in_removed_folders=True, exclude_tags_in_added_udt_instances=True)