Factory Wide Multi Barcode Scanner Setup - Discussion/Feedback

Most of what I'm finding online makes use of ExecutorService or ScheduledExecutorService to handle time limited threads. It seems like the 'ScheduledExecutorService` would be the way to go as it can be non blocking?

Does this look about right?

from java.lang import Throwable, Thread
from java.net import Socket, ServerSocket
from java.io import BufferedReader, InputStreamReader, IOException
from java.util.concurrent import ConcurrentLinkedQueue, Executors, TimeUnit

bvm = system.util.globalVarMap("PersistentTCPScannerListener")


SOCKET_RETRY_PERIOD = 30000
SCANNER_CONCURRENT_QUEUE = bvm.setdefault("tcpScannerQueue", ConcurrentLinkedQueue())
TCP_LISTEN_SERVER_PORT = magicPortNumber
TCP_LISTEN_SERVER_HOSTNAME = "serverHostName"
TCPSERVER_POOL_SIZE = 4


class TCPSocketServer(Thread):
	MAX_MESSAGE_TIME = 300000

	def __int__(self, hostName, port, concurrentQueue, poolSize):
		self.setDaemon(1)
		self.tcpReadExecutor = Executors.newScheduledThreadPool(poolSize)
		self.cq = concurrentQueue
		try:
			self.serverSocket = ServerSocket(port)
			logger.infof("Server listening on port %i", port)

		except IOException as ioe:
			logger.warn("TCP listener server failed to start", ioe)
			self.interrupt()
		return

	def run(self):
		while not self.interrupted():
			try:
				readerFuture = self.tcpReadExecutor.submit(TCPSocketServerReader(self.serverSocket.accept(), self.cq))
				self.tcpReadExecutor.schedule(readerFuture.cancel(True), self.MAX_MESSAGE_TIME, TimeUnit.MILLISECONDS)

			except Exception as e:
				logger.warn("Jython exception while attempting to monitor TCP socket", shared.later.PythonAsJavaException(e))

			except Throwable as t:
				logger.warn("Java exception while attempting to monitor TCP socket", t)

		self.tcpReadExecutor.shutdown()
		self.serverSocket.close()

		return

	def stop(self):
		self.interrupt()
		return


class TCPSocketServerReader(Thread):

	def __int__(self, socket, concurrentQueue):
		self.setDaemon(1)
		self.socket = socket
		self.cq = concurrentQueue

	def run(self):
		while not self.interrupted():
			try:
				reader = BufferedReader(InputStreamReader(self.socket.getInputStream))
				while reader:
					currentLine = reader.readLine()
					if currentLine:
						self.cq.offer(currentLine)

			except Exception as e:
				logger.warn("Jython error while attempting to read TCP Server socket data", shared.later.PythonAsJavaException(e))

			except Throwable as t:
				logger.warn("Java error while attempting to read TCP Server socket data", t)
			
			finally:
				self.socket.close()

		return

Maybe, but for this case I'd kick off a new thread. You can use a bound socket's own limits if you are afraid of too many threads. If you use a limited thread pool executor, a bound socket can make it stall with many accepts. If you make a thread pool unlimited, it is a lot simpler to just spawn a new thread.

After doing some testing I had to change my TCPClientListener to use a very tight sleep loop to check the BufferedReader.ready() value, and sleep for 15ms if it is not ready. readLine() seems to be fully blocking and won't respond to an interrupt. As additional protection I also set the socketSoTimeout to 30s on the off chance a read is started and then somehow never completes.

Updated TCPClientListener Class
class TCPClientListener(Thread):
	connectionRetryInterval = SOCKET_RETRY_PERIOD

	def __init__(self, deviceConfig, concurrentQueue):
		self.setDaemon(1)
		self.hostName = deviceConfig['hostname']
		self.port = deviceConfig['port']
		self.cq = concurrentQueue
		
		return

	def readTCPData(self):
		try:
			reader = BufferedReader(InputStreamReader(self.socket.inputStream))
			while reader:
				if not reader.ready():
					self.sleep(15)
					continue
				
				currentLine = reader.readLine()
				if currentLine:
					self.cq.offer(currentLine)
				
		except InterruptedException as ie:
			self.interrupt()

		except Throwable as t:
			logger.warn("Java exception while reading TCP socket data", t)
		
		except Exception as e:
			logger.warn("Jython exception while reading TCP socket data", shared.later.PythonAsJavaException(e))

		finally:
			self.socket.close()

		return

	def run(self):
		while not self.interrupted():
			try:
				self.socket = Socket(self.hostName, self.port)
				self.socket.setSoTimeout(30000)
				logger.infof("Successfully connected to endpoint %s:%s, waiting for incoming data stream", self.hostName, self.port)
				self.readTCPData()

			except IOException as ioe:
				logger.warnf("Unable to connect to endpoint, retrying in %d seconds", self.connectionRetryInterval/1000, ioe)
				self.sleep(self.connectionRetryInterval)

			except Throwable as t:
				logger.warn("Java exception while attempting to monitor TCP socket", t)
			
			except Exception as e:
				logger.warn("Jython exception while attempting to monitor TCP socket", shared.later.PythonAsJavaException(e))
		
		logger.infof("Successfully disconnected from endpoint")
		return

	def stop(self):
		self.interrupt()
		return

I'm having a similar blocking issue on my TCPSocketServer class, where the .accept() method will forever block, and does not appear to respond to an interrupt.

From my initial searches it appears that I may need to make use of java.nio.SocketServerChannel, which appears to have a non blocking mode for its accept() call.

I don't think I've ever mixed a buffered reader with a socket's inputstream when using the socket's timeout functionality. I would generally use the following pattern:

  • When waiting for the beginning of a message, set the socket timeout to ~1000 millis, then attempt to read one byte. Repeat until interrupted or got a byte.

  • If got one byte above, place the byte in the first subscript of a byte array, set the socket timeout to one millisecond, then attempt to read the balance of the byte array. (Use the largest possible message size). Note the quantity returned and attempt to interpret the bytes. Then go back to single-byte reads.

The above takes advantage of the typical serial-to-ethernet packet building behavior of most interfaces.

2 Likes

I was able to get this to work after some testing. I got stuck on my data coming from the scanner being in 3 or 4 chunks, requiring multiple reads to get all the data. That turned out to be a setting on my serial gateway, it was set to use inter-character time gap as the delimiter, I changed it to use 0x0d0a instead and now my class reads it all as 1 message.

I unfortunately have no idea what the maximum message size from the serial-ethernet gateway is, the only thing I can find in the docs is the max TCP buffer size, which is 16k, so I'm using that as my message size for the time being. All of the data I'm currently sending is way under that limit anyways, I'm currently in the ~ 36 bytes range for my lot IDs and ~50 bytes for my machine load point IDs.

Non BufferedReader TCP Listener Class
class TCPClientListener(Thread):
	connectionRetryInterval = SOCKET_RETRY_PERIOD

	def __init__(self, deviceConfig, concurrentQueue):
		self.setDaemon(1)
		self.hostName = deviceConfig['hostname']
		self.port = deviceConfig['port']
		self.cq = concurrentQueue
		self.byteArray = jarray.zeros(1024*16, 'b')

		return

	def readTCPData(self):
		try:
			sis = self.socket.inputStream

			# Check the interrupted flag without clearing it
			while not self.isInterrupted():
				try:
					newByte = sis.read()
					if newByte == -1:
						continue

					self.byteArray[0] = newByte
					self.socket.setSoTimeout(1)
					self.readByteCount = sis.read(self.byteArray, 1, len(self.byteArray)-1)

					message = String(self.byteArray, 0, self.readByteCount)
					if message:
						self.cq.offer(message)
					self.socket.setSoTimeout(1200)


				except IOException as ioe:
					self.socket.setSoTimeout(1200)
					continue

		except Throwable as t:
			logger.warn("Java exception while reading TCP socket data", t)

		except Exception as e:
			logger.warn("Jython exception while reading TCP socket data", shared.later.PythonAsJavaException(e))

		finally:
			self.socket.close()

		return

	def run(self):
		while not self.interrupted():
			try:
				self.socket = Socket(self.hostName, self.port)
				logger.infof("Connected to endpoint %s:%s, awaiting data stream", self.hostName, self.port)
				self.socket.setSoTimeout(1200)
				self.readTCPData()

			except InterruptedException as ie:
				self.interrupt()

			except IOException as ioe:
				logger.warnf("Unable to connect to endpoint, retrying in %d seconds", self.connectionRetryInterval/1000, ioe)
				self.sleep(self.connectionRetryInterval)

			except Throwable as t:
				logger.warn("Java exception while attempting to monitor TCP socket", t)

			except Exception as e:
				logger.warn("Jython exception while attempting to monitor TCP socket", shared.later.PythonAsJavaException(e))

		logger.info("Disconnected from endpoint")
		return

	def stop(self):
		self.interrupt()
		return
1 Like

Should this fast script be focused only on pairing data to the appropriate scanner queue/history and initial processing of a scanner's queue? (ie, append the barcode data to the appropriate scanner queue/row, then inspect the queue/row to determine if it should send data to a machine?)

My current testing has shown that the process to get data from the db and send it to a machine can take up to 300ms to complete (a few tag reads, db reads, tag writes, along with some checks to ensure data exists/is valid).

I would assume I should keep this part out of the fast timer event to be able to chew through the incoming TCP message queue fairly quickly, correct? In that case, would the best approach be to call a message handler from the fast script to perform the data transfer?

After some testing I came up with the following to work through chunks of the linked queue:


def updateScannersFIFO(scannerQueue, scannerSerial, barcodeData):

	scannerIndex = -1
	fifoValues = ["" for _ in xrange(1, len(DECODE_QUEUE_COLUMNS))]
	scanners = scannerQueue.getColumnAsList(0)

	if scannerSerial in scanners:
		scannerIndex = scanners.index(scannerSerial)
		fifoValues = [scannerQueue.getValueAt(scannerIndex, column) for column in xrange(1, scannerQueue.columnCount)]

	fifoValues.pop(-1)
	fifoValues.insert(0, barcodeData)
	fifoValues.insert(0, scannerSerial)

	scannerQueue = buildScannerDataFifo(scannerQueue, fifoValues, scannerIndex)

	return scannerQueue, fifoValues


def onScannerStateReady(currentState, currentFIFO):

	if MachineLoadPointIdentifier.match(currentFIFO[1]):
		return "STATE_WAIT_TRAVELER"

	# elif MachineUnloadPointIdentifier.match(currentFIFO[1]):
	# 	return "STATE_WAIT_LOAD_POINT"

	return currentState


def onScannerStateWaitTraveler(currentState, currentFIFO):

	travelerType = travelers.determineTravelerType(currentFIFO[1])
	if travelerType == "TYPE_UNK":
		return currentState

	machineID = None
	for barcodeData in currentFIFO[2:]:
		if MachineLoadPointIdentifier.match(barcodeData):
			machineID, sectionID = getMachineAndSectionFromBarcode(barcodeData)
			break

	if not machineID:
		return "STATE_READY"

	system.util.sendMessage(
		"Project",
		"RequestTravelerDataTransfer",
		{
			"targetID": machineID,
			"targetSecondaryID": sectionID,
			"targetType": "MACHINE",
			"sourceID": currentFIFO[1],
			"sourceType": "TRAVELER",
			"travelerType": travelerType
		},
		scope='G'
	)

	return "STATE_READY"


def updateScannerState(scannersState, scannerSerial, scannerHost, currentFIFO):

	scanners = scannersState.getColumnAsList(0)
	scannerIndex = -1
	currentState = "STATE_READY"

	if scannerSerial in scanners:
		scannerIndex = scanners.index(scannerSerial)
		currentState = scannersState.getValueAt(scannerIndex, 1)

	if currentState == "STATE_READY":
		currentState = onScannerStateReady(currentState, currentFIFO)

	elif currentState == "STATE_WAIT_TRAVELER":
		currentState = onScannerStateWaitTraveler(currentState, currentFIFO)

	if scannerIndex != -1:
		scannersState = system.dataset.setValue(scannersState, scannerIndex, 1, currentState)
	else:
		scannersState = system.dataset.addRow(scannersState, [scannerSerial, currentState])

	return scannersState, currentState


def processScannerData(scannerData):

	if not scannerData:
		return

	if len(scannerData) < 2:
		logger.warn("Scanner data did not include necessary data to process")
		logger.infof("Scanner data: '%s'", str(scannerData))
		return

	message = str(scannerData[1])
	messageParts = message.split(',', 1)
	if not len(messageParts) > 1:
		logger.warn("Scanner message did not include multiple parts")
		logger.infof("Scanner message '%s'", str(message))
		return

	scannerQueue = bvm.get('SCANNERS_QUEUE')
	scannerState = bvm.get('SCANNERS_STATE')
	scannerSerial, barcodeData = messageParts
	scannerHost = scannerData[0]

	bvm['SCANNERS_QUEUE'], currentFIFO = updateScannersFIFO(scannerQueue, scannerSerial, barcodeData)
	bvm['SCANNERS_STATE'], currentState = updateScannerState(scannerState, scannerSerial, scannerHost, currentFIFO)

	return


def processConcurrentQueue(concurrentQueue, chunkSize):
	start = System.nanoTime()
	if concurrentQueue.isEmpty():
		return 0

	if chunkSize > concurrentQueue.size():
		chunkSize = concurrentQueue.size()

	for idx in xrange(chunkSize):
		processScannerData(concurrentQueue.poll())

	# Write scanner queue to dataset tag for restart carry-over
	system.tag.writeBlocking(
		[
			'Path/ScannersFifo',
			'Path/ScannerState'
		],
		[bvm["SCANNERS_QUEUE"], bvm['SCANNERS_STATE']]
	)

	return System.nanoTime()-start

From my testing I ran into some weird cases where the first call to system.util.sendMessage seemed to block for a bit longer than normal, resulting in a high execution time for the chunk.

I'm currently running the processConcurrentQueue in a 1s fixed rate dedicated timer event with a chunk size of 30 for longer term testing. My normal execution times for the chunk are in the ~1ms range with a single scanner feeding the queue as fast as I can click it. Sometimes the message caller bumps it to ~3ms. I'll continue to monitor it as I bring more scanners online.

For scanner configurations, I currently have this in a list as a global variable. Would it be a good idea to plan to move it to a database table/dataset tag in the future when we are running many more scanners?

My concern with my current setup is the ease (or lack of) setting up a new device. Currently it requires a change and restart of the project script library. Ideally I'd have some sort of interface where you can see the scanner and the past few messages from the scanner, as well as add new devices to the configuration table.

Well, I'm a bit too preoccupied (Keyence KV driver) to go through it all, but a couple notes:

  • Don't use system.util.sendMessage() for any of this within a single gateway. A queue managed in a JVM-wide global can simply be accessed--the ones I recommend are thread-safe. Message handlers process message one-at-a-time.

  • Abstract your configuration into a database or an external JSON file. Use a function in your library to read the config and stuff it into a top-level module variable. Call it unconditionally at the bottom of the library script, so the module variable is always populated. Call it from elsewhere, if needed, to reinitialize without needing a project edit or gateway restart.

1 Like