I have a device that I need to connect to over a TCP socket, register what types of messages I want to receive (by sending a TCP packet for registration) and then store messages received from it in a DB. I have a main() that tries to connect, register, and read from the tcp socket connection in a loop. Main is not directly called but rather invokedAsync by threadManage(). The threadManage() script is run on Gateway Startup Event. Previously run threads are stored in system.util.getGlobals() (I have also tried pturmel’s LifeCycle module). When threadManage() is called it interrupts and joins the previous thread before invokeAsync(main). The thread in invokeAsync is stored in getGlobals for the next time.
The issue I’m running into is that every time the gateway scripts restart due to a save/edit the is an open file/socket leak (We are running on Linux so open files and sockets are one in the same). I can’t figure out what I am doing wrong that is handling closing previous threads or sockets. So I’m looking for insight into how my closing of the threads/socket is still leaving a file leak.
def main(host_ip, host_port):
"""
Loop forever. Try to connect, subscribe, and read.
If something fails, wait a short time and try again.
:param host_ip: The IP address of the host to connect to
:type host_ip: str
:param host_port: The port to connect to on the host
:type host_port: int
"""
sock = None
import java.lang.Thread as thr
import java.lang.Thread.State as ts
while True:
try:
#g = system.util.persistent('threading') # grab globally persistent dictionary
g = system.util.getGlobals()
if 'sock' in g:
try:
g['sock'].shutdown(socket.SHUT_RDWR)
g['sock'].close()
except:
pass
g['sock'] = None
#g = system.util.persistent('threading')
g = system.util.getGlobals()
if g['thread'].getState() == ts.TERMINATED or g['thread'].interrupted():
print 'here'
return
if g['thread'].id != thr.currentThread().id:
return
g['sock'] = try_to_connect(host_ip, host_port)
print g['sock']
if g['sock']:
subscribed_ok = subscribe()
if subscribed_ok:
read_until_failure()
g['sock'].close()
else:
print 'sleeping'
time.sleep(10.0)
except Exception as e:
print 'exception thrown'
print e.message
finally:
#g = system.util.persistent('threading') # grab globally persistent dictionary
g = system.util.getGlobals()
if g['sock']:
g['sock'].shutdown(socket.SHUT_RDWR)
g['sock'].close()
g['sock'] = None # to ensure garbage clean up
def ThreadManage():
"""
Function to create a socket thread and interrupts previous thread before starting its own.
"""
# imports
import time
import system
import java.lang.Thread as thr
import java.lang.Thread.State as ts
from java.util.concurrent import Semaphore
from java.util.concurrent import TimeUnit
system.tag.writeBlocking(['[default]Test/NML/timerScriptTime'],[system.date.now()])
#g = system.util.persistent('threading') # grab globally persistent dictionary
g = system.util.getGlobals()
if not 'lock' in g:
g['lock'] = Semaphore(1) # create Semaphore lock if one does not already exist
#lock = g['lock']
acquired = g['lock'].tryAcquire(5,TimeUnit.SECONDS) # Try to acquire the lock for 5 seconds
if not acquired:
return # end function if lock was not acquired
try:
if 'thread' in g: # if thread exists interrupt and join
g['thread'].interrupt()
g['thread'].join()
system.tag.writeBlocking(['[default]Test/NML/globalsIsNone'], [False])
else:
system.tag.writeBlocking(['[default]Test/NML/globalsIsNone'], [True])
g['thread'] = system.util.invokeAsynchronous(main,[HOST_IP, HOST_PORT]) # spin up new socket thread and store thread in global dict
return
except:
pass
finally:
g['lock'].release() # always release the lock