Managing multiple asynchronous threads

I've followed this post for monitoring a directory for file changes and I think I have created more than one watch service in different threads. How do I monitor, manage, and delete these threads?

To manage long-running jython threads through scripting restarts, you need to:

  1. Place the thread object in a dictionary that retains its content through script restarts. I used to recommend the dictionary from system.util.getGlobals(), but that is no longer suitable. I created a free Life Cycle Module that provides such dictionaries going forward.

  2. Write the asynchronous function’s loop so that it regularly checks Thread.interrupted(), particularly after any java I/O operations. Clean up and return when true.

  3. Write the asynchronous function’s startup section to grab the persistent dictionary, look up any thread already on its own key, and if present, .interrupt() and join() it. Then place the function’s own thread, Thread.currentThread(), into the persistent dictionary in that same key. Then proceed with the rest of your startup logic (including picking up any saved state from a prior thread’s cleanup).

2 Likes

Starting with number 1, I'm modifying the script like so. Is this correct?
OLD

	if "fw" not in system.util.getGlobals():
		system.util.getGlobals()["fw"] = FileWatcher()

NEW

	if "fileWatcher" not in system.util.persistent():
		system.util.persistant("fileWatcher") = FileWatcher()

No, my system.util.persistant returns a dictionary, too, but the argument lets you isolate scopes more conveniently.

should be like this then

if "fileWatcher" not in system.util.persistent():
		system.util.persistant()["fileWatcher"] = FileWatcher()

No, more like this:

persist = system.util.persistent("someUniqueConstantName")
oldFileWatcher = persist.pop("fw")
persist['fw'] = FileWatcher(oldFileWatcher)

def getWatcher():
    return persist['fw']

Note the oldFileWatcher passed to the new one so the new one’s thread can tell it to shut down.

1 Like

Am I missing anything? I’m not sure if I still need the kill() and run() functions

from java.nio.file import FileSystems,Files
from java.io import File
from java.nio.file import StandardWatchEventKinds as swek
from java.util.concurrent import TimeUnit
	
class FileWatcher():
	isRunning = False
	def __init__(self):
		self.run()
	
	def _doAsync(self):
		self.isRunning = True
		watcher = FileSystems.getDefault().newWatchService()
			
		p = File("\\\\L:\\Files\\CFC MINIPLOTS").toPath()
		#p = File("\\\\hid-ign-01\\Cutting Miniplots").toPath()
		key = p.register(watcher,swek.ENTRY_CREATE,swek.ENTRY_DELETE,swek.ENTRY_MODIFY)
		while self.isRunning:
			k = watcher.poll(5,TimeUnit.SECONDS)
			if k:
				for e in k.pollEvents():
					kind = e.kind()
					
					if kind == swek.ENTRY_DELETE:
						filePath = p.resolve(e.context()).toString()
						system.tag.write('Cutting/watchService/deletedFile',filePath)
						
					if kind == swek.ENTRY_MODIFY:
						filePath = p.resolve(e.context()).toString()
						system.tag.write('Cutting/watchService/modifiedFile',filePath)
						
					elif kind == swek.ENTRY_CREATE:
						filePath = p.resolve(e.context()).toString()
						system.tag.write('Cutting/watchService/newFile',filePath)
						Files.probeContentType(p.resolve(e.context()))
							
				k.reset()				
				
				
	def kill(self):
		self.isRunning = False
		
	def run(self):
		if not self.isRunning:
			system.util.invokeAsynchronous(self._doAsync)
		
persist = system.util.persistant('miniplotWatcher')
oldFW = persist.pop('fw')
persist['fw'] = FileWatcher(oldFW)
			
def getWatcher():
	return persist['fw']

Multiple problems.

  • Mis-spelled persistent.
  • You’ve no argument to init() to capture oldFW and .interrupt() it.
  • You’re not regularly checking Thread.interrupted() in your run() method.
  • You are not running in another thread. To interrupt your FileWatcher directly, you need to inherit from java.lang.Thread and use .start().

@pturmel Java noob try #2. Not sure if I am passing oldFW correctly and self.interrupted() doesn’t seem right.

from java.lang import Thread
from java.nio.file import FileSystems,Files
from java.io import File
from java.nio.file import StandardWatchEventKinds as swek
from java.util.concurrent import TimeUnit
	
class FileWatcher():
	isRunning = False
	
	def __init__(self, oldFW):
		oldFW.interrupt()
		oldFW.join()
		self.run()
		
	def _doAsync(self):
		self.isRunning = True
		watcher = FileSystems.getDefault().newWatchService()
				
		p = File("\\\\L:\\Files\\CFC MINIPLOTS").toPath()
		#p = File("\\\\hid-ign-01\\Cutting Miniplots").toPath()
		key = p.register(watcher,swek.ENTRY_CREATE,swek.ENTRY_DELETE,swek.ENTRY_MODIFY)
		while self.isRunning:
			k = watcher.poll(5,TimeUnit.SECONDS)
			if k:
				for e in k.pollEvents():
					kind = e.kind()
					
					if kind == swek.ENTRY_DELETE:
						filePath = p.resolve(e.context()).toString()
						system.tag.write('Cutting/watchService/deletedFile',filePath)
						
					if kind == swek.ENTRY_MODIFY:
						filePath = p.resolve(e.context()).toString()
						system.tag.write('Cutting/watchService/modifiedFile',filePath)
							
					elif kind == swek.ENTRY_CREATE:
						filePath = p.resolve(e.context()).toString()
						system.tag.write('Cutting/watchService/newFile',filePath)
						Files.probeContentType(p.resolve(e.context()))
				k.reset()
				if self.interrupted():
					kill(self)				
					
					
	def kill(self):
		self.isRunning = False
			
	def run(self):
		if not self.isRunning:
			system.util.invokeAsynchronous(self._doAsync)

persist = system.util.persistent('miniplotWatcher')
oldFW = persist.pop('fw')
persist['fw'] = FileWatcher(oldFW)
						
def getWatcher():
	return persist['fw']