I've created the attached script that can be used in Ignition that will read the Productivity extended CSV file and create a formatted JSON file that can be imported into Ignition as a UDT with all the tags that have modbus addresses.
The script will work out of the box to read the .csv and create a json file but for it to be useful you will need to modify it giving the (structure_name) a name useful to you and the (dev_name) will need to match you device name in Ignition's Device Connections
It does not have all the arrays or data types mapped and will not import anything that's not mapped but it's easy enough to add them if you need to.
It will not import tags that have invalid characters in Ignition.
Tags with spaces in the name will have them replaced with "-"
Any structures or tags with "_" will be placed in folders with the root name.
It will ask you for the source .csv file and where you'd like to save the json.
Once the json file is created you can import it as a UDT in ignition then create tags from the UTD instance.
Obviously it can be modified to suit your needs and formatting preferences.
Updated Better Version:
import csv
import json
import codecs
import time
from collections import defaultdict
# Start timer
start = time.time()
# Get structure and device name
structure_name = system.gui.inputBox("Enter Structure Name:")
if not structure_name:
system.gui.messageBox("No Structure Name.")
quit()
dev_name = system.gui.inputBox("Enter Device ID:")
if not dev_name:
system.gui.messageBox("No Device ID.")
quit()
# Select CSV source file
source_file = system.file.openFile('csv')
if source_file is None:
system.gui.messageBox("No file selected.")
quit()
# Select destination JSON file
jsonpath = system.file.saveFile("json")
if jsonpath is None:
system.gui.messageBox("No save location selected.")
quit()
jsonpath += ".json"
# Data type mappings
data_types = {
"SBR": "Boolean", "C": "Boolean", "MST": "Boolean",
"US8": "Int2", "US16": "Int4", "S16": "Int4", "S32": "Int4",
"AIS32": "Int4", "AOS32": "Int4", "F32": "Float4", "STR": "String",
"AR1S32": "Int4", "DO": "Boolean", "DI": "Boolean"
}
# Modbus register mapping
reg_data_types = {
"SBR": "C", "C": "C", "MST": "C",
"US8": "HR", "US16": "HR", "S16": "HR", "S32": "HRI",
"AIS32": "IRI", "AOS32": "HRI", "F32": "HRF", "STR": "HRS",
"AR1S32": "HRI", "DO": "C", "DI": "DI"
}
# Tag containers
folder_map = defaultdict(list)
root_tags = []
try:
with open(source_file, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
headers = csv_reader.fieldnames
print("CSV headers:", headers)
# Build lowercase header map once for fast column lookup
header_map = {col.strip().lower(): col for col in headers}
def get_column_value(row, target_column):
col = header_map.get(target_column.strip().lower())
return row.get(col, "") if col else ""
for row_index, row in enumerate(csv_reader, start=1):
try:
modbus_address = row.get("MODBUS Start Address", "").strip()
if not modbus_address:
continue
try:
start_addr = int(modbus_address) %10000
except ValueError:
print("⚠️ Invalid MODBUS Start Address at row {}: '{}'".format(row_index, modbus_address))
continue
tag_name_raw = row.get("Tag Name", "").strip()
if not tag_name_raw:
continue
tag_name = tag_name_raw.replace(" ", "-").replace(".", "_").replace("AIS32-0","Analog In").replace("AOS32-0","Analog Out")
data_type_key = row.get("Data Type", "").strip()
if data_type_key not in data_types:
print("❌ Unknown Data Type '{}' at row {}".format(data_type_key, row_index))
continue
data_type = data_types[data_type_key]
reg_type = reg_data_types[data_type_key]
opc_path = "["+ dev_name +"]" + reg_type + str(start_addr)
# String tag handling
if data_type == "String":
raw_length = get_column_value(row, "Number Of Characters")
try:
num_char = int(raw_length.strip())
opc_path += ":" + str(num_char)
except:
print("⚠️ Invalid or missing 'Number Of Characters' for tag '{}'. Raw: '{}'".format(tag_name, raw_length))
# Build tag definition
tag = {
"valueSource": "opc",
"opcItemPath": opc_path,
"dataType": data_type,
"name": tag_name,
"tagType": "AtomicTag",
"opcServer": "Ignition OPC-UA Server"
}
# Add to folder if applicable
if "_" in tag_name:
folder, subname = tag_name.split("_", 1)
tag["name"] = subname
folder_map[folder].append(tag)
else:
root_tags.append(tag)
# Progress feedback every 500 rows
if row_index % 500 == 0:
print("Processed {} rows...".format(row_index))
except Exception as e:
print("❌ Error processing row {}: {}".format(row_index, e))
# Append folders to root tag structure
for folder, children in folder_map.items():
folder_tag = {
"name": folder,
"tagType": "Folder",
"tags": children
}
root_tags.append(folder_tag)
# Build final JSON UDT structure
final_structure = {
"name": structure_name,
"typeId": "",
"tagType": "UdtType",
"tags": root_tags
}
with codecs.open(jsonpath, mode='w', encoding='utf-8') as jsonfile:
json.dump(final_structure, jsonfile, indent=4)
print("✅ Final JSON structure created successfully.")
print("Elapsed time: {:.2f} seconds".format(time.time() - start))
system.gui.messageBox("Conversion complete.")
except Exception as e:
print("❌ Error during JSON creation: {}".format(e))
system.gui.messageBox("Error: {}".format(e))