Publishing to MQTT Brokers via script - Issues with TCP Connections Increasing

So i am using some gatway timer scripts multiple executing every 60s.

The issue i am running into is after a few days the windows server ignition is on locks up network wise due to TCP Port exhaustion i believe. When troubleshooting I can consistently watch the TCP connections increase. I have added debugging to the script and its showing that it is disconnecting from the connection. But the TCP connections just keep increasing.

Looking for some advice on how to manage this.

import json
import time
import ssl
import paho.mqtt.client as mqtt
from collections import OrderedDict

# Define the MQTT broker details
broker_address = "a24qeh3kq5ixg0-ats.iot.us-east-1.amazonaws.com"
port = 8883
certs_path = r"C:\Users\Public\Documents\client_mqtt_certificates\Divconbroker1"

keepalive = 45

# Define the base tag path
base_tag_path = "[default]MQTT Tags/Databank_Phase1/ATL75_All-Rooms"

# Define the path for the connection status tag
connection_status_tag_path = "[default]NO_BROKER_CONNECTION"

# Function to check if certificate files exist
def check_cert_files(certs_path, os_module):
    ca_cert_file = os_module.path.join(certs_path, "AWSRootCA1.pem")
    cert_file = os_module.path.join(certs_path, "certificate.pem.crt")
    key_file = os_module.path.join(certs_path, "private.pem.key")

    for file_path in [ca_cert_file, cert_file, key_file]:
        if not os_module.path.isfile(file_path):
            raise Exception("File not found: {}".format(file_path))
    
    return ca_cert_file, cert_file, key_file

# Check certificate files
import os  # Importing os outside functions and passing it explicitly
ca_cert_file, cert_file, key_file = check_cert_files(certs_path, os)

# Function to initialize MQTT client
def initialize_mqtt_client(broker_address, port, ca_cert_file, cert_file, key_file, mqtt_module, ssl_module, keepalive):
    client = mqtt_module.Client()
    client.tls_set(ca_certs=ca_cert_file,
                   certfile=cert_file,
                   keyfile=key_file,
                   tls_version=ssl_module.PROTOCOL_TLSv1_2)
    
    client.connected_flag = False  # Create flag in client object

    def on_publish(client, userdata, mid):
        pass

    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            client.connected_flag = True  # Set flag when connected

    def on_log(client, userdata, level, buf):
        pass

    client.on_publish = on_publish
    client.on_connect = on_connect
    client.on_log = on_log

    try:
        client.connect(broker_address, port, keepalive)
        client.loop_start()
    except Exception as e:
        pass

    return client

# Function to check MQTT connection
def check_mqtt_connection(client, timeout=10, time_module=None):
    if time_module is None:
        import time as time_module  # Ensure time is imported within the function scope
    for _ in range(timeout):
        if client.connected_flag:
            return True
        time_module.sleep(1)
    return False

# Function to process a single device
def process_device(device, client, time_module, system, OrderedDict, json_module):
    try:
        device_name = device['name']
        if device['hasChildren']:
            tags = system.tag.browse(device['fullPath'])
            tag_paths = [str(tag['fullPath']) for tag in tags.getResults()]

            if tag_paths:
                tag_values = system.tag.readBlocking(tag_paths)
                tag_data = OrderedDict()
                unit_paths = [tag_path + ".EngUnit" for tag_path in tag_paths]
                units = system.tag.readBlocking(unit_paths)

                for tag, tag_value, unit in zip(tags.getResults(), tag_values, units):
                    tag_quality = tag_value.quality
                    value = "Bad Quality" if not tag_quality.isGood() else tag_value.value
                    tag_timestamp = tag_value.timestamp if tag_quality.isGood() else None

                    if value != "Bad Quality":
                        tag_name = tag['name']
                        tag_data[tag_name] = OrderedDict([
                            ("timestamp", int(tag_timestamp.getTime() / 1000)),
                            ("value", value),
                            ("unit", unit.value)
                        ])

                if tag_data:
                    device_path_str = str(device['fullPath'])
                    device_path_parts = device_path_str.split("/")
                    device_type = device_path_parts[-2]
                    location_name = device_path_parts[-3]

                    base_payload = OrderedDict([
                        ("deviceType", device_type),
                        ("deviceName", device_name),
                        ("location", location_name),
                        ("timestamp", int(time_module.time())),
                    ])

                    tag_items = list(tag_data.items())
                    sequence_number = 0
                    max_chunk_size = 128 * 1024

                    while tag_items:
                        current_chunk = OrderedDict()
                        current_size = 0
                        while tag_items and current_size < max_chunk_size:
                            tag_item = tag_items.pop(0)
                            tag_item_json = json_module.dumps({tag_item[0]: tag_item[1]})
                            if current_size + len(tag_item_json) > max_chunk_size:
                                tag_items.insert(0, tag_item)
                                break
                            current_chunk[tag_item[0]] = tag_item[1]
                            current_size += len(tag_item_json)

                        payload = base_payload.copy()
                        payload["tags"] = current_chunk
                        topic_suffix = "" if sequence_number == 0 else "/" + str(sequence_number)
                        full_topic = "stevesite/ATL75/Phase1/All-Rooms/" + device_type + "/" + device_name + topic_suffix

                        payload_json = json_module.dumps(payload)

                        result = client.publish(full_topic, payload_json, qos=1)
                        result.wait_for_publish()

                        sequence_number += 1
    except Exception:
        pass

try:
    # Initialize the client
    client = initialize_mqtt_client(broker_address, port, ca_cert_file, cert_file, key_file, mqtt, ssl, keepalive)
    
    connection_successful = check_mqtt_connection(client, time_module=time)

    if not connection_successful:
        system.tag.writeBlocking([connection_status_tag_path], [True])
        raise Exception("Failed to connect to the broker.")
    else:
        system.tag.writeBlocking([connection_status_tag_path], [False])

    tag_structure = system.tag.browse(base_tag_path)

    for location in tag_structure.getResults():
        if location['hasChildren']:
            devices = system.tag.browse(location['fullPath'])
            for device in devices.getResults():
                process_device(device, client, time_module=time, system=system, OrderedDict=OrderedDict, json_module=json)

except Exception:
    system.tag.writeBlocking([connection_status_tag_path], [True])
finally:
    if client is not None:
        client.loop_stop()
        client.disconnect()


Do the ports never get released, or do they get released but it takes longer than 60 seconds?

Windows keeps closed sockets in the TIME_WAIT state for 4 minutes by default, I think... not ideal if you're opening multiple every 60 seconds. I think you can change some registry settings to decrease this, or if the paho library allows access to socket options you can try SO_LINGER and/or SO_REUSEADDR...

Are you using CPorts? If not, might be able to filter it by process and monitor their states, etc with it.

Ok so i have adjusted the scripts where they use the same connection pool and adjusted the Time_Wait and major improvement, only a few connections come up on port 8883 now and when a connection has to re-stablish then the old ones are closed and removed efficiently now.

Do i need to worrk about thse loopbacks, they seem be quite a few

Sorry, don't know what those are from or if they need to be worried about.

Cross linking, in case an answer turns up at SO:

Also, a lot of the MQTT problems have been resolved over on Cirrus Link's forum.

1 Like

Oh yes I am aware of the Cirrus Link module and we typically use it. However this customer does not like the sparkplug format and have a very specific json payload format.

I have been able to get the tcp connections managed in regard to the mqtt port 8883 but the internal ignition loopback connections just keep increasing still. Not sure how to handle this yet.

Usually the problem with those not liking the Sparkplug format is that they have some system that can't ingest the data properly because it's not just plain-text JSON payloads. In my personal opinion, standard JSON payloads aren't the right format for industrial data, especially when it needs to be deterministic. Maybe you've implemented proper LWTs on your own, but Sparkplug handles that for you. In the industrial world, if you're not using Sparkplug B, you're not doing it right (in my opinion).

For the loopback connections, if you temporarily turn off your MQTT scripts so they're not functioning, do they still get created?

1 Like

I agree on the spark plug comments 100%.

No they do not get created constantly if the scripts don’t run, they appear parallel the script execution.

Steve Deer

System Integration Manager

DIVCON CONTROLS

1801 Royal Lane Suite 100
Farmers Branch, Texas 75229
O: 214.821.6958 | C: 940.208.5226

The cirrus link mqtt engine module has a publish function that you can use to publish in any format you like.

While I believe you can customize some items defining a custom json payload structure doesn’t seem to be an option.

The other caveat outside custom payload styling is all tags every minutes and then alarm tags cov. That’s why we have tried to approach this with scripts outside the cirrus link offering.

Looks like you're in my neck of the woods (live in Carrollton, work in Plano). I think what @cos may be saying is that using the CirrusLink module may be a better alternative as it may already do things like pool connections, etc when using the scripting. What I'm not sure about is if you'd need to use the Engine module or if the Transmission module will work in scripting alone without an actual transmitter configured (I believe the engine module can publish custom topics without anything else, just not sure on Transmission).

We indeed are close

Also @cos I did not know this existed I was even on a call with Cirrus Link weeks back explaining a need for a custom payload.

No need for Paho MQTT if I can publish my custom payloads via this method