Gateway Timer Scripts troubleshooting

Built a project with some time based scripts that run on the gateway to check the status of a camera and then update a state machine based on camera status. Script A.

Also built a gateway timer script that checks for devices by UDT type if it finds them it updates an IP address tag.
Script B.

Imported my project at a site that has multiple projects. They have some inheritance conifgured.

Both scripts are in configured in my project, Script B runs fine every 60 seconds. Script A never runs.

I’m so confused.

scriptA
def handleTimerEvent():
try:
from shared.pyrometer_comm import execute_all_pyrometers
execute_all_pyrometers()
except Exception, e:
exc_type, exc_value, exc_tb = sys.exc_info()
logger.error("Timer error: {} - {}".format(exc_type, exc_value))

ScriptB (works great)
def handleTimerEvent():

GATEWAY TIMER SCRIPT - Discover by UDT Type

Finds all UDT instances of type "EnduranceCamera"

Run every 300 seconds

def updateEnduranceCamerasByType():
    """
    Find all UDT instances of type EnduranceCamera and update their IPs
    """
    import system

    def findEnduranceCameraUDTsByType():
        """
        Find all UDT instances that are of type EnduranceCamera
        """
        found_udts = []
    
        try:
            # Browse all tags in default provider recursively
            results = system.tag.browse("[default]CoorsTek/Hillsboro/Vacuum Furnaces/Fluke Pyrometers/Furnace_969", {"recursive": True})
        
            # Handle different result types
            items = []
        
            if hasattr(results, 'getResults'):
                # Results collection
                for result in results.getResults():
                    items.append(result)
            elif hasattr(results, '__iter__'):
                # Iterable results
                for result in results:
                    items.append(result)
            else:
                logger.warn("Unknown results type: {}".format(type(results)))
                return []
        
            logger.info("Checking {} items for EnduranceCamera UDT type...".format(len(items)))
        
            for item in items:
                try:
                    # Get item properties
                    item_path = item.get('fullPath', '') or item.get('path', '')
                
                    # Convert to string if it's a TagPath object
                    if item_path:
                        item_path = str(item_path)
                
                    type_id = item.get('typeId', '')
                    data_type = item.get('dataType', '')
                    tag_type = item.get('tagType', '')
                
                    # Check if this is an EnduranceCamera UDT instance
                    is_endurance_camera = (
                        'EnduranceCamera' in str(type_id) or
                        'EnduranceCamera' in str(data_type) or
                        (tag_type == 'UdtInstance' and 'EnduranceCamera' in str(type_id))
                    )
                
                    if is_endurance_camera and item_path:
                        device_name = mapUDTToDevice(item_path)
                    
                        found_udts.append({
                            'path': item_path,
                            'device': device_name,
                            'type_id': type_id,
                            'data_type': data_type,
                            'tag_type': tag_type
                        })
                    
                        logger.info("âś… Found EnduranceCamera UDT: {}".format(item_path))
                        logger.debug("   Type ID: {}".format(type_id))
                        logger.debug("   Data Type: {}".format(data_type))
                        logger.debug("   Tag Type: {}".format(tag_type))
                        logger.debug("   Device: {}".format(device_name))
                    
                except Exception as e:
                    logger.debug("Error processing item: {}".format(str(e)))
                    continue
        
            return found_udts
        
        except Exception as e:
            logger.error("Error in findEnduranceCameraUDTsByType: {}".format(str(e)))
            return fallbackTypeDiscovery()

    def fallbackTypeDiscovery():
        """
        Fallback method using tag configuration to check UDT types
        """
        logger.info("Using fallback type discovery...")
        found_udts = []
    
        try:
            # Browse non-recursively and check each potential UDT
            results = system.tag.browse("[default]", {"recursive": False})
        
            items = []
            if hasattr(results, 'getResults'):
                items = results.getResults()
            elif hasattr(results, '__iter__'):
                items = list(results)
        
            for item in items:
                try:
                    item_path = item.get('fullPath', '') or item.get('path', '')
                
                    # Convert to string if it's a TagPath object
                    if item_path:
                        item_path = str(item_path)
                
                    if item_path and not item_path.endswith(']'):  # Skip provider roots
                        # Check if this is a UDT instance by getting its configuration
                        config = system.tag.getConfiguration(item_path, False)[0]
                    
                        if hasattr(config, 'getTypeId'):
                            type_id = str(config.getTypeId())
                        
                            if 'EnduranceCamera' in type_id:
                                device_name = mapUDTToDevice(item_path)
                            
                                found_udts.append({
                                    'path': item_path,
                                    'device': device_name,
                                    'type_id': type_id
                                })
                            
                                logger.info("âś… Fallback found EnduranceCamera UDT: {}".format(item_path))
                                logger.debug("   Type ID: {}".format(type_id))
                            
                except Exception as e:
                    logger.debug("Error checking config for {}: {}".format(item_path, str(e)))
                    continue
        
            return found_udts
        
        except Exception as e:
            logger.error("Error in fallback discovery: {}".format(str(e)))
            return []

    def mapUDTToDevice(udt_path):
        """Map UDT to device name - each UDT should map to its own device"""
        try:
            # Convert TagPath object to string if needed
            udt_path_str = str(udt_path)
        
            # Get TCP devices
            dlist = system.device.listDevices()
            tcp_devices = []
        
            for row in range(dlist.getRowCount()):
                device_name = dlist.getValueAt(row, 0)
                driver_type = dlist.getValueAt(row, 3)
                if driver_type == "TCPDriver":
                    tcp_devices.append(device_name)
        
            logger.debug("TCP devices available: {}".format(tcp_devices))
            logger.debug("Mapping UDT: {}".format(udt_path_str))
        
            # FIRST PRIORITY: Try to read Device parameter from UDT instance
            try:
                config = system.tag.getConfiguration(udt_path_str, False)[0]
                if hasattr(config, 'getParameters'):
                    parameters = config.getParameters()
                    for param in parameters:
                        if param.getName() == "Device":
                            param_value = param.getValue()
                            if param_value and str(param_value) in tcp_devices:
                                logger.info("Found Device parameter: {} -> {}".format(udt_path_str, param_value))
                                return str(param_value)
                            elif param_value:
                                logger.warn("Device parameter '{}' not found in TCP devices for {}".format(param_value, udt_path_str))
            except Exception as e:
                logger.debug("Could not read Device parameter from {}: {}".format(udt_path_str, str(e)))
        
            # Extract UDT instance name (e.g., "Furnace101", "Furnace102")
            udt_name = udt_path_str.split("/")[-1].replace("]", "")
            logger.debug("UDT instance name: {}".format(udt_name))
        
            # SECOND PRIORITY: Try exact name matching
            # Look for TCP device with same name as UDT instance
            for device_name in tcp_devices:
                if device_name.lower() == udt_name.lower():
                    logger.info("Exact name match: {} -> {}".format(udt_name, device_name))
                    return device_name
        
            # THIRD PRIORITY: Try pattern matching based on UDT name
            # For Furnace101 -> look for Camera101, Furnace101, etc.
            # For Furnace102 -> look for Camera102, Furnace102, etc.
        
            # Extract number from UDT name if present
            import re
            udt_number_match = re.search(r'(\d+)', udt_name)
            udt_number = udt_number_match.group(1) if udt_number_match else None
        
            if udt_number:
                logger.debug("UDT number: {}".format(udt_number))
            
                # Look for devices with the same number
                for device_name in tcp_devices:
                    device_number_match = re.search(r'(\d+)', device_name)
                    device_number = device_number_match.group(1) if device_number_match else None
                
                    if device_number == udt_number:
                        logger.info("Number match: {} (number {}) -> {} (number {})".format(
                            udt_name, udt_number, device_name, device_number))
                        return device_name
        
            # FOURTH PRIORITY: Try partial name matching
            for device_name in tcp_devices:
                # UDT name contains device name (e.g., Furnace101 contains Camera)
                if device_name.lower() in udt_name.lower():
                    logger.info("Partial match (device in UDT): {} contains {} -> {}".format(
                        udt_name, device_name, device_name))
                    return device_name
            
                # Device name contains UDT name (e.g., Camera101 contains Furnace101)
                if udt_name.lower() in device_name.lower():
                    logger.info("Partial match (UDT in device): {} contains {} -> {}".format(
                        device_name, udt_name, device_name))
                    return device_name
        
            # LAST RESORT: Warn about multiple devices and require manual mapping
            if len(tcp_devices) > 1:
                logger.warn("Multiple TCP devices found but no clear mapping for {}".format(udt_path_str))
                logger.warn("Available devices: {}".format(tcp_devices))
                logger.warn("Please set the 'Device' parameter in the UDT instance to specify which device to use")
                return None
            elif len(tcp_devices) == 1:
                # Only one device - use it but warn
                device = tcp_devices[0]
                logger.warn("Only one TCP device found, defaulting {} -> {}".format(udt_path_str, device))
                return device
            else:
                logger.error("No TCP devices found for mapping UDT: {}".format(udt_path_str))
                return None
        
        except Exception as e:
            logger.error("Error mapping UDT to device: {}".format(str(e)))
            logger.error("UDT path type: {}".format(type(udt_path)))
            logger.error("UDT path value: {}".format(repr(udt_path)))
            return None

    # Main logic
    try:
        logger = system.util.getLogger("EnduranceCameraUpdater")
        logger.info("=== ENDURANCE CAMERA TYPE-BASED UPDATE ===")
    
        # Step 1: Get TCP device IPs
        tcp_devices = {}
        dlist = system.device.listDevices()
    
        for row in range(dlist.getRowCount()):
            device_name = dlist.getValueAt(row, 0)
            driver_type = dlist.getValueAt(row, 3)
        
            if driver_type == "TCPDriver":
                try:
                    hostname = system.device.getDeviceHostname(device_name)
                    tcp_devices[device_name] = hostname
                    logger.info("TCP Device: {} -> {}".format(device_name, hostname))
                except Exception as e:
                    logger.warn("Could not get IP for {}: {}".format(device_name, str(e)))
    
        if not tcp_devices:
            logger.error("No TCP devices found!")
            return
    
        # Step 2: Find EnduranceCamera UDT instances by type
        udts = findEnduranceCameraUDTsByType()
    
        if not udts:
            logger.warn("No EnduranceCamera UDT instances found")
            return
    
        logger.info("Found {} EnduranceCamera UDT instance(s)".format(len(udts)))
    
        # Step 3: Update each UDT instance
        updated_count = 0
    
        for udt_info in udts:
            udt_path = udt_info['path']
            device_name = udt_info['device']
        
            # Convert TagPath to string
            udt_path_str = str(udt_path)
        
            logger.info("Processing: {} (device: {})".format(udt_path_str, device_name))
        
            if device_name and device_name in tcp_devices:
                device_ip = tcp_devices[device_name]
                ip_tag_path = udt_path_str + "/DeviceIPAddress"
            
                try:
                    # Read current value
                    current_result = system.tag.readBlocking([ip_tag_path])[0]
                    current_ip = current_result.value
                    current_quality = current_result.quality
                
                    # Determine if update needed
                    should_write = False
                
                    if not current_quality.isGood():
                        should_write = True
                    elif current_ip is None:
                        should_write = True
                    elif str(current_ip).strip() in ["0", ""]:
                        should_write = True
                    elif str(current_ip).strip() != str(device_ip).strip():
                        should_write = True
                
                    if should_write:
                        write_result = system.tag.writeBlocking([ip_tag_path], [device_ip])
                    
                        if write_result[0].isGood():
                            logger.info("âś… Updated {} DeviceIPAddress: {} -> {}".format(udt_path_str, current_ip, device_ip))
                            updated_count += 1
                        else:
                            logger.error("❌ Failed to write IP: {}".format(write_result[0]))
                    else:
                        logger.debug("No update needed for {}".format(udt_path_str))
                    
                except Exception as e:
                    logger.error("Error updating {}: {}".format(udt_path_str, str(e)))
            else:
                logger.warn("Device '{}' not found for {}".format(device_name, udt_path_str))
    
        if updated_count > 0:
            logger.info("âś… Update complete - {} UDT(s) updated".format(updated_count))
        else:
            logger.info("âś… Update complete - no changes needed")
    
    except Exception as e:
        logger = system.util.getLogger("EnduranceCameraUpdater")
        logger.error("Error in updateEnduranceCamerasByType: {}".format(str(e)))

# ===== EXECUTION =====

# Discover by UDT type definition
updateEnduranceCamerasByType()

I don’t understand.

I didn’t scan through everything but couple of things :

  1. Hopefully all of this is contained in a project library script and you call the main function using a one-liner in the gateway timer script ? If not, you should definitely do this.
  2. Do not import system, there’s no need to if you do things right.

Please see Wiki - how to post code on this forum. Can you edit your original post to fix the code blocks? You've lost all the indentation and any lines starting with # have been converted to headings by the Markdown converter. (You should have noticed this in the post preview.) Thanks.

I would take out the try/except and let whatever error happens show in the gateway logs. At least until you can get to your root cause.

2 Likes

Do not define functions in gateway event scripts. There is a legacy scoping issue that screws it up.

Make the event script a one-line call to a project library script, and do your function definitions there. (And keep all your imports outside any function definition.)

2 Likes

I don’t want to come across as if I’m on a high horse, but I do want to share some thoughts.

To me, this looks like AI code. I mean, I’ve never seen anyone add a checkmark emoji to a logger info message, or any debug message for that matter. I hope you won’t depend on using AI to do your job too much.

Let’s assume I’m wrong. If anyone came to me with a similar issue, I would ask them for their design, plan, or any type of thought process. I suggest you do the same.

Please, take a step back. Think of a plan. Make a flowchart. Write some pseudocode. Start with small pieces of code. Maybe test the functions separately in the Script Console, with some prints, then un-clutter your code.

2 Likes

I completely trust and will follow your advice but curiosity makes me ask why? If the imported library is only used inside the function, won’t it go out of scope and be closed/destroyed after the function call?

The import statement takes an interpreter lock, every time, before checking if the import is already satisfied. It is a performance bottleneck for busy events/functions.

2 Likes

True - LLMs don’t understand the filtering capabilities of system.tag.browseand the code is doing unnecessary extra work to find the tags you’re interested in.

1 Like