So i am using some gatway timer scripts multiple executing every 60s.
The issue i am running into is after a few days the windows server ignition is on locks up network wise due to TCP Port exhaustion i believe. When troubleshooting I can consistently watch the TCP connections increase. I have added debugging to the script and its showing that it is disconnecting from the connection. But the TCP connections just keep increasing.
Looking for some advice on how to manage this.
import json
import time
import ssl
import paho.mqtt.client as mqtt
from collections import OrderedDict
# Define the MQTT broker details
broker_address = "a24qeh3kq5ixg0-ats.iot.us-east-1.amazonaws.com"
port = 8883
certs_path = r"C:\Users\Public\Documents\client_mqtt_certificates\Divconbroker1"
keepalive = 45
# Define the base tag path
base_tag_path = "[default]MQTT Tags/Databank_Phase1/ATL75_All-Rooms"
# Define the path for the connection status tag
connection_status_tag_path = "[default]NO_BROKER_CONNECTION"
# Function to check if certificate files exist
def check_cert_files(certs_path, os_module):
ca_cert_file = os_module.path.join(certs_path, "AWSRootCA1.pem")
cert_file = os_module.path.join(certs_path, "certificate.pem.crt")
key_file = os_module.path.join(certs_path, "private.pem.key")
for file_path in [ca_cert_file, cert_file, key_file]:
if not os_module.path.isfile(file_path):
raise Exception("File not found: {}".format(file_path))
return ca_cert_file, cert_file, key_file
# Check certificate files
import os # Importing os outside functions and passing it explicitly
ca_cert_file, cert_file, key_file = check_cert_files(certs_path, os)
# Function to initialize MQTT client
def initialize_mqtt_client(broker_address, port, ca_cert_file, cert_file, key_file, mqtt_module, ssl_module, keepalive):
client = mqtt_module.Client()
client.tls_set(ca_certs=ca_cert_file,
certfile=cert_file,
keyfile=key_file,
tls_version=ssl_module.PROTOCOL_TLSv1_2)
client.connected_flag = False # Create flag in client object
def on_publish(client, userdata, mid):
pass
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True # Set flag when connected
def on_log(client, userdata, level, buf):
pass
client.on_publish = on_publish
client.on_connect = on_connect
client.on_log = on_log
try:
client.connect(broker_address, port, keepalive)
client.loop_start()
except Exception as e:
pass
return client
# Function to check MQTT connection
def check_mqtt_connection(client, timeout=10, time_module=None):
if time_module is None:
import time as time_module # Ensure time is imported within the function scope
for _ in range(timeout):
if client.connected_flag:
return True
time_module.sleep(1)
return False
# Function to process a single device
def process_device(device, client, time_module, system, OrderedDict, json_module):
try:
device_name = device['name']
if device['hasChildren']:
tags = system.tag.browse(device['fullPath'])
tag_paths = [str(tag['fullPath']) for tag in tags.getResults()]
if tag_paths:
tag_values = system.tag.readBlocking(tag_paths)
tag_data = OrderedDict()
unit_paths = [tag_path + ".EngUnit" for tag_path in tag_paths]
units = system.tag.readBlocking(unit_paths)
for tag, tag_value, unit in zip(tags.getResults(), tag_values, units):
tag_quality = tag_value.quality
value = "Bad Quality" if not tag_quality.isGood() else tag_value.value
tag_timestamp = tag_value.timestamp if tag_quality.isGood() else None
if value != "Bad Quality":
tag_name = tag['name']
tag_data[tag_name] = OrderedDict([
("timestamp", int(tag_timestamp.getTime() / 1000)),
("value", value),
("unit", unit.value)
])
if tag_data:
device_path_str = str(device['fullPath'])
device_path_parts = device_path_str.split("/")
device_type = device_path_parts[-2]
location_name = device_path_parts[-3]
base_payload = OrderedDict([
("deviceType", device_type),
("deviceName", device_name),
("location", location_name),
("timestamp", int(time_module.time())),
])
tag_items = list(tag_data.items())
sequence_number = 0
max_chunk_size = 128 * 1024
while tag_items:
current_chunk = OrderedDict()
current_size = 0
while tag_items and current_size < max_chunk_size:
tag_item = tag_items.pop(0)
tag_item_json = json_module.dumps({tag_item[0]: tag_item[1]})
if current_size + len(tag_item_json) > max_chunk_size:
tag_items.insert(0, tag_item)
break
current_chunk[tag_item[0]] = tag_item[1]
current_size += len(tag_item_json)
payload = base_payload.copy()
payload["tags"] = current_chunk
topic_suffix = "" if sequence_number == 0 else "/" + str(sequence_number)
full_topic = "stevesite/ATL75/Phase1/All-Rooms/" + device_type + "/" + device_name + topic_suffix
payload_json = json_module.dumps(payload)
result = client.publish(full_topic, payload_json, qos=1)
result.wait_for_publish()
sequence_number += 1
except Exception:
pass
try:
# Initialize the client
client = initialize_mqtt_client(broker_address, port, ca_cert_file, cert_file, key_file, mqtt, ssl, keepalive)
connection_successful = check_mqtt_connection(client, time_module=time)
if not connection_successful:
system.tag.writeBlocking([connection_status_tag_path], [True])
raise Exception("Failed to connect to the broker.")
else:
system.tag.writeBlocking([connection_status_tag_path], [False])
tag_structure = system.tag.browse(base_tag_path)
for location in tag_structure.getResults():
if location['hasChildren']:
devices = system.tag.browse(location['fullPath'])
for device in devices.getResults():
process_device(device, client, time_module=time, system=system, OrderedDict=OrderedDict, json_module=json)
except Exception:
system.tag.writeBlocking([connection_status_tag_path], [True])
finally:
if client is not None:
client.loop_stop()
client.disconnect()