A little nerd sniped here by the prospect of letting an LLM help with this...
As a starting point, this would be a standalone script (e.g. read_status_record.py
, totally external/independent of Ignition), that needs to be run with Python 2.7, not Python 3.x.
#!/usr/bin/env python
"""
M307 Status Record Reader
Connects to an M307 device via TCP and reads the current status record.
Outputs the parsed data as JSON.
Usage:
python read_status_record.py <hostname_or_ip> [port]
Example:
python read_status_record.py 192.168.1.100
python read_status_record.py 192.168.1.100 10001
"""
import sys
import json
import socket
# Constants
DEFAULT_PORT = 10001
SOCKET_TIMEOUT = 5 # 5 seconds
RECORD_SIZE = 60
# Status record read command
STATUS_READ_CMD = [0x3f, 0xcd, 0xdc, 0x00] + [0x00] * 56
# Special temperature values
TEMP_NO_SENSOR = 1000
TEMP_OPEN_CIRCUIT = 999
TEMP_SHORT_CIRCUIT = -999
# Special humidity values
HUMIDITY_FAILED = 999
def two_bytes_to_signed_int(msb, lsb):
"""
Converts two bytes (MSB, LSB) to a 16-bit signed integer.
Based on the Java example in section 1.3 of the documentation.
Args:
msb: Most significant byte
lsb: Least significant byte
Returns:
16-bit signed integer
"""
# Mask with 0xFF to ensure unsigned byte values
high = msb & 0xFF
low = lsb & 0xFF
# Combine bytes: shift MSB left 8 bits and OR with LSB
value = (high << 8) | low
# Convert to signed 16-bit integer
# If the most significant bit is set, the number is negative
if value >= 0x8000:
value = value - 0x10000
return value
def two_bytes_to_unsigned_int(msb, lsb):
"""
Converts two bytes (MSB, LSB) to a 16-bit unsigned integer.
Args:
msb: Most significant byte
lsb: Least significant byte
Returns:
16-bit unsigned integer
"""
high = msb & 0xFF
low = lsb & 0xFF
return (high << 8) | low
def interpret_temperature(raw_value, resolution_divisor, temp_unit):
"""
Interprets a raw temperature value according to the documentation.
Args:
raw_value: Raw temperature value as signed integer
resolution_divisor: 10.0 if 0.1 degree resolution, 1.0 otherwise
temp_unit: Temperature unit ('C' or 'F')
Returns:
Dictionary with temperature reading and status
"""
result = {
'value': None,
'unit': temp_unit,
'status': 'ok'
}
if raw_value == TEMP_NO_SENSOR:
result['status'] = 'no_sensor'
elif raw_value == TEMP_OPEN_CIRCUIT:
result['status'] = 'open_circuit'
elif raw_value == TEMP_SHORT_CIRCUIT:
result['status'] = 'short_circuit'
else:
result['value'] = raw_value / resolution_divisor
return result
def interpret_humidity(raw_value):
"""
Interprets a raw humidity value according to the documentation.
Humidity is always returned with 0.1%RH resolution.
Args:
raw_value: Raw humidity value as signed integer
Returns:
Dictionary with humidity reading and status
"""
result = {
'value': None,
'unit': '%RH',
'status': 'ok'
}
if raw_value == HUMIDITY_FAILED:
result['status'] = 'sensor_failed'
else:
result['value'] = raw_value / 10.0
return result
def parse_status_record(data):
"""
Parses the 60-byte status record according to Table 1 in the documentation.
Args:
data: List/array of 60 bytes received from the M307
Returns:
Dictionary containing all parsed fields
"""
# Verify the command bytes match the expected response
if len(data) != RECORD_SIZE:
raise ValueError("Invalid record size: expected %d bytes, got %d" % (RECORD_SIZE, len(data)))
if data[0] != 0x3F or data[1] != 0xCD or data[2] != 0xDC or data[3] != 0x00:
raise ValueError("Invalid command bytes in response")
# Determine temperature resolution (byte 59, index 58)
# If byte 59 == 10, then 0.1 degree resolution, else 1 degree resolution
resolution_divisor = 10.0 if (data[58] & 0xFF) == 10 else 1.0
# Determine temperature unit (byte 60, index 59)
# 0x43 = 'C', 0x46 = 'F'
temp_unit_byte = data[59] & 0xFF
if temp_unit_byte == 0x43:
temp_unit = 'C'
elif temp_unit_byte == 0x46:
temp_unit = 'F'
else:
temp_unit = 'unknown'
# Parse Temperature Sensor 1 (bytes 5-9, indices 4-8)
temp1_raw = two_bytes_to_signed_int(data[4], data[5])
temp1_reading = interpret_temperature(temp1_raw, resolution_divisor, temp_unit)
temp1_time_out = two_bytes_to_unsigned_int(data[6], data[7])
temp1_out_of_limits = (data[8] & 0xFF) == 1
# Parse Temperature Sensor 2 (bytes 10-14, indices 9-13)
temp2_raw = two_bytes_to_signed_int(data[9], data[10])
temp2_reading = interpret_temperature(temp2_raw, resolution_divisor, temp_unit)
temp2_time_out = two_bytes_to_unsigned_int(data[11], data[12])
temp2_out_of_limits = (data[13] & 0xFF) == 1
# Parse Internal Temperature Sensor (bytes 15-19, indices 14-18)
internal_temp_raw = two_bytes_to_signed_int(data[14], data[15])
internal_temp_reading = interpret_temperature(internal_temp_raw, resolution_divisor, temp_unit)
internal_temp_time_out = two_bytes_to_unsigned_int(data[16], data[17])
internal_temp_out_of_limits = (data[18] & 0xFF) == 1
# Parse Internal Humidity Sensor (bytes 20-24, indices 19-23)
# Humidity always has 0.1%RH resolution
humidity_raw = two_bytes_to_signed_int(data[19], data[20])
humidity_reading = interpret_humidity(humidity_raw)
humidity_time_out = two_bytes_to_unsigned_int(data[21], data[22])
humidity_out_of_limits = (data[23] & 0xFF) == 1
# Parse Door 1 (bytes 25-29, indices 24-28)
# Byte 26 (index 25): 1 = closed, 0 = open
door1_state = 'closed' if (data[25] & 0xFF) == 1 else 'open'
door1_time_out = two_bytes_to_unsigned_int(data[26], data[27])
door1_out_of_limits = (data[28] & 0xFF) == 1
# Parse Door 2 (bytes 30-34, indices 29-33)
# Byte 31 (index 30): 1 = closed, 0 = open
door2_state = 'closed' if (data[30] & 0xFF) == 1 else 'open'
door2_time_out = two_bytes_to_unsigned_int(data[31], data[32])
door2_out_of_limits = (data[33] & 0xFF) == 1
# Parse Main Power (byte 35, index 34)
# 4 = on, 0 = off
main_power = 'on' if (data[34] & 0xFF) == 4 else 'off'
# Parse Battery Voltage (bytes 36-37, indices 35-36)
# Returned * 100 (e.g., 241 = 2.41v)
battery_voltage_raw = two_bytes_to_unsigned_int(data[35], data[36])
battery_voltage = battery_voltage_raw / 100.0
# Build the result dictionary
result = {
'temperature_sensor_1': {
'reading': temp1_reading,
'time_out_of_limits_minutes': temp1_time_out,
'out_of_limits_state': temp1_out_of_limits
},
'temperature_sensor_2': {
'reading': temp2_reading,
'time_out_of_limits_minutes': temp2_time_out,
'out_of_limits_state': temp2_out_of_limits
},
'internal_temperature_sensor': {
'reading': internal_temp_reading,
'time_out_of_limits_minutes': internal_temp_time_out,
'out_of_limits_state': internal_temp_out_of_limits
},
'internal_humidity_sensor': {
'reading': humidity_reading,
'time_out_of_limits_minutes': humidity_time_out,
'out_of_limits_state': humidity_out_of_limits
},
'door_1': {
'state': door1_state,
'time_out_of_limits_minutes': door1_time_out,
'out_of_limits_state': door1_out_of_limits
},
'door_2': {
'state': door2_state,
'time_out_of_limits_minutes': door2_time_out,
'out_of_limits_state': door2_out_of_limits
},
'main_power_state': main_power,
'battery_voltage': battery_voltage,
'temperature_resolution': '0.1_degree' if resolution_divisor == 10.0 else '1_degree',
'temperature_unit': temp_unit
}
return result
def read_status_record(host, port):
"""
Connects to an M307 device and reads the status record.
Args:
host: IP address or hostname of the M307 device
port: TCP port number (default 10001)
Returns:
Dictionary containing the parsed status record
Raises:
socket.error: If connection or communication fails
socket.timeout: If connection or read times out
"""
sock = None
try:
# Create TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(SOCKET_TIMEOUT)
# Connect to the M307 device
sock.connect((host, port))
# Convert command to byte array
# In Python 2.7, we need to convert integers to bytes
cmd_bytes = bytearray(STATUS_READ_CMD)
# Send the status record read command
sock.sendall(str(cmd_bytes))
# Read the 60-byte response
response = bytearray()
total_read = 0
while total_read < RECORD_SIZE:
chunk = sock.recv(RECORD_SIZE - total_read)
if not chunk:
raise IOError("Connection closed before receiving complete response")
response.extend(bytearray(chunk))
total_read += len(chunk)
# Parse the response
result = parse_status_record(response)
return result
except socket.timeout, e:
raise IOError("Connection timeout: " + str(e))
except socket.error, e:
raise IOError("Communication error: " + str(e))
finally:
if sock is not None:
try:
sock.close()
except socket.error:
pass # Ignore errors during cleanup
def main():
"""Main entry point for the script."""
# Parse command line arguments
if len(sys.argv) < 2:
print >> sys.stderr, "Usage: %s <hostname_or_ip> [port]" % sys.argv[0]
print >> sys.stderr, "Example: %s 192.168.1.100" % sys.argv[0]
print >> sys.stderr, "Example: %s 192.168.1.100 10001" % sys.argv[0]
sys.exit(1)
host = sys.argv[1]
port = DEFAULT_PORT
if len(sys.argv) >= 3:
try:
port = int(sys.argv[2])
if port < 1 or port > 65535:
raise ValueError("Port must be between 1 and 65535")
except ValueError, e:
print >> sys.stderr, "Error: Invalid port number - %s" % str(e)
sys.exit(1)
# Read the status record
try:
result = read_status_record(host, port)
# Output as JSON
# Use indent for pretty printing
print json.dumps(result, indent=2, sort_keys=True)
except IOError, e:
print >> sys.stderr, "Error: %s" % str(e)
sys.exit(1)
except ValueError, e:
print >> sys.stderr, "Error parsing response: %s" % str(e)
sys.exit(1)
except Exception, e:
print >> sys.stderr, "Unexpected error: %s" % str(e)
sys.exit(1)
if __name__ == '__main__':
main()