Integrating Temperature Guard M307 over TCP/IP

Hello,

Forewarning, I am a complete novice to a lot of this. I do not have much coding background, though I am willing to learn as I go.

I am trying to find out how to read the message (or message bytes?) I am seeing in the OPC Quick Client for my Temp Guard M307 test unit.

Right now, the only information I am seeing is “Good” despite knowing the Temperature Guard is sending a 60 byte message. The documentation breaks down what the different bytes mean, but I am uncertain how to tap into that to get the actual useable data.

I assume there is a need to decode the message but I am uncertain how best to do that. I’ve seen a few posts regarding using JAVA.

Is there a driver for these units I am simply missing? Or do I have to write some sort of script to decipher this near real time?

Thanks in advance!

It sounds like maybe you're using the generic TCP driver to connect to this?

Do you have any documentation for this device about how its TCP function behaves and what the data format is?

In any case you're probably going to have to write code to handle decoding whatever message this thing sends, but at this point it's not even clear if the TCP driver is suitable or if you'll have to make a connection from scripting instead.

I am using the generic TCP Driver, that is correct.

I do have the document. It is a full SDK integration guide.

SDK-M307-M308-M309 1-7.pdf (216.1 KB)

image

I don't think the TCP driver is a good fit for this.

Scripting, or a custom Ignition module, would be better.

Even scripting is going to be a moderate effort to get working if you're not already adept at Jython/Python. The custom Ignition module, while a better fit, is a big reach.

The plus side is that you seem to have adequate documentation...

1 Like

A little nerd sniped here by the prospect of letting an LLM help with this...

As a starting point, this would be a standalone script (e.g. read_status_record.py, totally external/independent of Ignition), that needs to be run with Python 2.7, not Python 3.x.

#!/usr/bin/env python
"""
M307 Status Record Reader

Connects to an M307 device via TCP and reads the current status record.
Outputs the parsed data as JSON.

Usage:
    python read_status_record.py <hostname_or_ip> [port]

Example:
    python read_status_record.py 192.168.1.100
    python read_status_record.py 192.168.1.100 10001
"""

import sys
import json
import socket

# Constants
DEFAULT_PORT = 10001
SOCKET_TIMEOUT = 5  # 5 seconds
RECORD_SIZE = 60

# Status record read command
STATUS_READ_CMD = [0x3f, 0xcd, 0xdc, 0x00] + [0x00] * 56

# Special temperature values
TEMP_NO_SENSOR = 1000
TEMP_OPEN_CIRCUIT = 999
TEMP_SHORT_CIRCUIT = -999

# Special humidity values
HUMIDITY_FAILED = 999


def two_bytes_to_signed_int(msb, lsb):
    """
    Converts two bytes (MSB, LSB) to a 16-bit signed integer.
    Based on the Java example in section 1.3 of the documentation.

    Args:
        msb: Most significant byte
        lsb: Least significant byte

    Returns:
        16-bit signed integer
    """
    # Mask with 0xFF to ensure unsigned byte values
    high = msb & 0xFF
    low = lsb & 0xFF

    # Combine bytes: shift MSB left 8 bits and OR with LSB
    value = (high << 8) | low

    # Convert to signed 16-bit integer
    # If the most significant bit is set, the number is negative
    if value >= 0x8000:
        value = value - 0x10000

    return value


def two_bytes_to_unsigned_int(msb, lsb):
    """
    Converts two bytes (MSB, LSB) to a 16-bit unsigned integer.

    Args:
        msb: Most significant byte
        lsb: Least significant byte

    Returns:
        16-bit unsigned integer
    """
    high = msb & 0xFF
    low = lsb & 0xFF
    return (high << 8) | low


def interpret_temperature(raw_value, resolution_divisor, temp_unit):
    """
    Interprets a raw temperature value according to the documentation.

    Args:
        raw_value: Raw temperature value as signed integer
        resolution_divisor: 10.0 if 0.1 degree resolution, 1.0 otherwise
        temp_unit: Temperature unit ('C' or 'F')

    Returns:
        Dictionary with temperature reading and status
    """
    result = {
        'value': None,
        'unit': temp_unit,
        'status': 'ok'
    }

    if raw_value == TEMP_NO_SENSOR:
        result['status'] = 'no_sensor'
    elif raw_value == TEMP_OPEN_CIRCUIT:
        result['status'] = 'open_circuit'
    elif raw_value == TEMP_SHORT_CIRCUIT:
        result['status'] = 'short_circuit'
    else:
        result['value'] = raw_value / resolution_divisor

    return result


def interpret_humidity(raw_value):
    """
    Interprets a raw humidity value according to the documentation.
    Humidity is always returned with 0.1%RH resolution.

    Args:
        raw_value: Raw humidity value as signed integer

    Returns:
        Dictionary with humidity reading and status
    """
    result = {
        'value': None,
        'unit': '%RH',
        'status': 'ok'
    }

    if raw_value == HUMIDITY_FAILED:
        result['status'] = 'sensor_failed'
    else:
        result['value'] = raw_value / 10.0

    return result


def parse_status_record(data):
    """
    Parses the 60-byte status record according to Table 1 in the documentation.

    Args:
        data: List/array of 60 bytes received from the M307

    Returns:
        Dictionary containing all parsed fields
    """
    # Verify the command bytes match the expected response
    if len(data) != RECORD_SIZE:
        raise ValueError("Invalid record size: expected %d bytes, got %d" % (RECORD_SIZE, len(data)))

    if data[0] != 0x3F or data[1] != 0xCD or data[2] != 0xDC or data[3] != 0x00:
        raise ValueError("Invalid command bytes in response")

    # Determine temperature resolution (byte 59, index 58)
    # If byte 59 == 10, then 0.1 degree resolution, else 1 degree resolution
    resolution_divisor = 10.0 if (data[58] & 0xFF) == 10 else 1.0

    # Determine temperature unit (byte 60, index 59)
    # 0x43 = 'C', 0x46 = 'F'
    temp_unit_byte = data[59] & 0xFF
    if temp_unit_byte == 0x43:
        temp_unit = 'C'
    elif temp_unit_byte == 0x46:
        temp_unit = 'F'
    else:
        temp_unit = 'unknown'

    # Parse Temperature Sensor 1 (bytes 5-9, indices 4-8)
    temp1_raw = two_bytes_to_signed_int(data[4], data[5])
    temp1_reading = interpret_temperature(temp1_raw, resolution_divisor, temp_unit)
    temp1_time_out = two_bytes_to_unsigned_int(data[6], data[7])
    temp1_out_of_limits = (data[8] & 0xFF) == 1

    # Parse Temperature Sensor 2 (bytes 10-14, indices 9-13)
    temp2_raw = two_bytes_to_signed_int(data[9], data[10])
    temp2_reading = interpret_temperature(temp2_raw, resolution_divisor, temp_unit)
    temp2_time_out = two_bytes_to_unsigned_int(data[11], data[12])
    temp2_out_of_limits = (data[13] & 0xFF) == 1

    # Parse Internal Temperature Sensor (bytes 15-19, indices 14-18)
    internal_temp_raw = two_bytes_to_signed_int(data[14], data[15])
    internal_temp_reading = interpret_temperature(internal_temp_raw, resolution_divisor, temp_unit)
    internal_temp_time_out = two_bytes_to_unsigned_int(data[16], data[17])
    internal_temp_out_of_limits = (data[18] & 0xFF) == 1

    # Parse Internal Humidity Sensor (bytes 20-24, indices 19-23)
    # Humidity always has 0.1%RH resolution
    humidity_raw = two_bytes_to_signed_int(data[19], data[20])
    humidity_reading = interpret_humidity(humidity_raw)
    humidity_time_out = two_bytes_to_unsigned_int(data[21], data[22])
    humidity_out_of_limits = (data[23] & 0xFF) == 1

    # Parse Door 1 (bytes 25-29, indices 24-28)
    # Byte 26 (index 25): 1 = closed, 0 = open
    door1_state = 'closed' if (data[25] & 0xFF) == 1 else 'open'
    door1_time_out = two_bytes_to_unsigned_int(data[26], data[27])
    door1_out_of_limits = (data[28] & 0xFF) == 1

    # Parse Door 2 (bytes 30-34, indices 29-33)
    # Byte 31 (index 30): 1 = closed, 0 = open
    door2_state = 'closed' if (data[30] & 0xFF) == 1 else 'open'
    door2_time_out = two_bytes_to_unsigned_int(data[31], data[32])
    door2_out_of_limits = (data[33] & 0xFF) == 1

    # Parse Main Power (byte 35, index 34)
    # 4 = on, 0 = off
    main_power = 'on' if (data[34] & 0xFF) == 4 else 'off'

    # Parse Battery Voltage (bytes 36-37, indices 35-36)
    # Returned * 100 (e.g., 241 = 2.41v)
    battery_voltage_raw = two_bytes_to_unsigned_int(data[35], data[36])
    battery_voltage = battery_voltage_raw / 100.0

    # Build the result dictionary
    result = {
        'temperature_sensor_1': {
            'reading': temp1_reading,
            'time_out_of_limits_minutes': temp1_time_out,
            'out_of_limits_state': temp1_out_of_limits
        },
        'temperature_sensor_2': {
            'reading': temp2_reading,
            'time_out_of_limits_minutes': temp2_time_out,
            'out_of_limits_state': temp2_out_of_limits
        },
        'internal_temperature_sensor': {
            'reading': internal_temp_reading,
            'time_out_of_limits_minutes': internal_temp_time_out,
            'out_of_limits_state': internal_temp_out_of_limits
        },
        'internal_humidity_sensor': {
            'reading': humidity_reading,
            'time_out_of_limits_minutes': humidity_time_out,
            'out_of_limits_state': humidity_out_of_limits
        },
        'door_1': {
            'state': door1_state,
            'time_out_of_limits_minutes': door1_time_out,
            'out_of_limits_state': door1_out_of_limits
        },
        'door_2': {
            'state': door2_state,
            'time_out_of_limits_minutes': door2_time_out,
            'out_of_limits_state': door2_out_of_limits
        },
        'main_power_state': main_power,
        'battery_voltage': battery_voltage,
        'temperature_resolution': '0.1_degree' if resolution_divisor == 10.0 else '1_degree',
        'temperature_unit': temp_unit
    }

    return result


def read_status_record(host, port):
    """
    Connects to an M307 device and reads the status record.

    Args:
        host: IP address or hostname of the M307 device
        port: TCP port number (default 10001)

    Returns:
        Dictionary containing the parsed status record

    Raises:
        socket.error: If connection or communication fails
        socket.timeout: If connection or read times out
    """
    sock = None
    try:
        # Create TCP socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(SOCKET_TIMEOUT)

        # Connect to the M307 device
        sock.connect((host, port))

        # Convert command to byte array
        # In Python 2.7, we need to convert integers to bytes
        cmd_bytes = bytearray(STATUS_READ_CMD)

        # Send the status record read command
        sock.sendall(str(cmd_bytes))

        # Read the 60-byte response
        response = bytearray()
        total_read = 0

        while total_read < RECORD_SIZE:
            chunk = sock.recv(RECORD_SIZE - total_read)
            if not chunk:
                raise IOError("Connection closed before receiving complete response")
            response.extend(bytearray(chunk))
            total_read += len(chunk)

        # Parse the response
        result = parse_status_record(response)

        return result

    except socket.timeout, e:
        raise IOError("Connection timeout: " + str(e))
    except socket.error, e:
        raise IOError("Communication error: " + str(e))
    finally:
        if sock is not None:
            try:
                sock.close()
            except socket.error:
                pass  # Ignore errors during cleanup


def main():
    """Main entry point for the script."""
    # Parse command line arguments
    if len(sys.argv) < 2:
        print >> sys.stderr, "Usage: %s <hostname_or_ip> [port]" % sys.argv[0]
        print >> sys.stderr, "Example: %s 192.168.1.100" % sys.argv[0]
        print >> sys.stderr, "Example: %s 192.168.1.100 10001" % sys.argv[0]
        sys.exit(1)

    host = sys.argv[1]
    port = DEFAULT_PORT

    if len(sys.argv) >= 3:
        try:
            port = int(sys.argv[2])
            if port < 1 or port > 65535:
                raise ValueError("Port must be between 1 and 65535")
        except ValueError, e:
            print >> sys.stderr, "Error: Invalid port number - %s" % str(e)
            sys.exit(1)

    # Read the status record
    try:
        result = read_status_record(host, port)

        # Output as JSON
        # Use indent for pretty printing
        print json.dumps(result, indent=2, sort_keys=True)

    except IOError, e:
        print >> sys.stderr, "Error: %s" % str(e)
        sys.exit(1)
    except ValueError, e:
        print >> sys.stderr, "Error parsing response: %s" % str(e)
        sys.exit(1)
    except Exception, e:
        print >> sys.stderr, "Unexpected error: %s" % str(e)
        sys.exit(1)


if __name__ == '__main__':
    main()
1 Like

Ok this may have gotten a little out of hand.

Full disclaimer: I have no idea if any of this works, if for no other reason that I don't even have one of these things to test against.

5 Likes

I appreciate you! I will take a look when I have time to dive into this. May be a bit! Thank you!