Using system.util.getGlobals with classes

So, I was nerd-sniped pretty intensely by this yesterday, as the nesting of persistent dictionaries has some performance considerations that suggest some instance caching is appropriate.

This is utterly untested, but should give you a framework to use if you decide you really don't want the latency of database storage (and I wouldn't use document tags):

from threading import RLock

_g = system.util.getGlobals()

# This class requires a dictionary in its constructor to
# carry state.  It delegates attribute assignment and retrieval
# to corresponding keys in the state dictionary.  For compatibility
# with persistent object storage (like _g above), it is vital that
# only native Jython or Java object instances be stored in
# the state dictionary.
#
# Attributes that begin with an underscore are not diverted
# to the state dictionary.
#
class DictWrapper(object):
	def __init__(self, persistenceDictionary):
		self._myState = persistenceDictionary

	# Note that __getattr__ is only called when the required name
	# doesn't exist *as an attribute*.
	def __getattr__(self, attr):
		try:
			return self._myState[attr]
		except KeyError:
			raise AttributeError(attr)

	# Note that __setattr__ is *always* called for attribute
	# assignment and must explicitly defer to the superclass
	# for attributes that must not be diverted.
	def __setattr__(self, attr, val):
		if attr.startswith('_'):
			super(DictWrapper, self).__setattr__(attr, val)
		else:
			self._myState[attr] = val

# Implement a job-with-nested-tasks class architecture using persistent
# object storage.  Jobs and tasks within jobs are *named* for maximum
# concurrency.  Each instance is initialized with .deleted set False
# so that deletions in parallel contexts are noticeable.  (Any instance
# that sets .deleted is expected to then delete from the persistent
# dictionary.  Parallel instances will notice the deleted status upon
# cache lookup and prune at that point.)

jobStates = _g.setdefault('JobStatesByName', {})

# A job instance wraps a state dictionary that is maintained within
# _g so that all jython contexts will share state.  The class itself
# maintains a cache of all wrapped instances.
#
# Generally, job instance algorithms would use `with self.lock:` for
# critical sections where thread safety matters.
class JobTracker(DictWrapper):
	_trackers = {}
	_cacheLock = RLock()

	# Users should never call this.  Unconditional instance creation
	# must use the static method .lookupOrMake() to avoid duplicate
	# cache entries, and to prune deletions.  Retrieve existing
	# instances with .lookup() to ensure deletions are pruned.
	def __init__(self, job_name):
		with JobTracker._cacheLock:
			super(JobTracker, self).__init__(jobStates.setdefault(job_name, {}))
			self._myState.setdefault('tasks', {})
			self._myState.setdefault('lock', RLock())
			self._myState.setdefault('deleted', False)
			self._tasksCache = {}
			# Make this instance's name effectively constant.  Attribute
			# writes to job_name will be diverted to _myState, but attribute
			# reads will get this value.
			self.__dict__['job_name'] = job_name
			JobTracker._trackers[job_name] = self

	# An instance's delete method handles the persistence details.
	def delete(self):
		with self.lock:
			self.deleted = True
			with JobTracker._cacheLock:
				jobStates.pop(job_name, None)
				JobTracker._trackers.pop(job_name, None)

	@staticmethod
	def _cachedJob(job_name):
		# Fast path is unlocked.
		job = JobTracker._trackers[job_name]
		if job.deleted:
			with JobTracker._cacheLock:
				# Repeat the lookup under the lock
				job = JobTracker._trackers[job_name]
				if job.deleted:
					JobTracker._trackers.pop(job_name, None)
					throw KeyError(job_name)
		return job

	@staticmethod
	def lookup(job_name):
		# Fast path is unlocked.
		try:
			return JobTracker._cachedJob(job_name)
		except KeyError:
			# Try again under the cache lock
			with JobTracker._cacheLock:
				try:
					return JobTracker._cachedJob(job_name)
				except KeyError:
					# This will give another key error if it really doesn't exist
					jobPersistence = jobStates[job_name]
					# If no error, instantiate.
					return JobTracker(job_name)

	@staticmethod
	def lookupOrMake(job_name):
		# Fast path is unlocked.
		try:
			return JobTracker._cachedJob(job_name)
		except KeyError:
			# Try again under the cache lock
			with JobTracker._cacheLock:
				try:
					return JobTracker._cachedJob(job_name)
				except KeyError:
					# Unconditionally make an instance.
					return JobTracker(job_name)

	# Manage a cache of nested tasks' instances, similar to the cache of
	# job instances.
	def _cachedTask(self, task_name):
		# Fast path is unlocked.
		task = self._tasksCache[task_name]
		if task.deleted:
			with self.lock:
				# Repeat the lookup under the lock
				task = self._tasksCache[task_name]
				if task.deleted:
					self._tasksCache.pop(task_name, None)
					throw KeyError(task_name)
		return task

	def getTask(self, task_name):
		# Fast path is unlocked.
		try:
			return self._cachedTask(task_name)
		except KeyError:
			# Try again under the job lock
			with self.lock:
				try:
					return self._cachedTask(task_name)
				except KeyError:
					# This will give a final key error if it really doesn't exist
					taskPersistence = self.tasks[task_name]
					# Wrap it in the Task class
					return TaskTracker(self.job_name, task_name)

	def getOrMakeTask(self, task_name):
		# Fast path is unlocked.
		try:
			return self._cachedTask(task_name)
		except KeyError:
			# Try again under the job lock
			with self.lock:
				try:
					return self._cachedTask(task_name)
				except KeyError:
					# Unconditionally make a Task instance
					return TaskTracker(self.job_name, task_name)

	# Provide a helper for iteration over a job's tasks that
	# supplies the task instances instead of the task's persistence
	# dictionary, pruning automatically.
	@property
	def _tasks(self):
		pruneKeys = set(self._tasksCache.keys())
		for k, v in self.tasks.items():
			pruneKeys.discard(k)
			try:
				yield self.getTask(k)
			except KeyError:
				# Only here when a task is pruned concurrently with
				# this generator loop.
				pass
		for k in pruneKeys:
			try:
				self._cachedTask(k)
			except KeyError:
				pass

# A task instance wraps a state dictionary that is maintained within
# the job's state dictionary so that all jython contexts will share state.
# The job instance maintains a cache of all wrapped task instances.
#
# Task lookup and creation must be performed by the job to maintain
# the cache and perform deletion pruning.
#
# Generally, task instance algorithms would use `with self.lock:` for
# critical sections where thread safety matters.
class TaskTracker(DictWrapper):
	def __init__(self, job_name, task_name):
		# The following will fail if the job doesn't exist.
		self._job = JobTracker.lookup(job_name)
		with self._job.lock:
			super(TaskTracker, self).__init__(self._job.tasks.setdefault(task_name, {}))
			self._myState.setdefault('deleted', False)
			self._myState.setdefault('lock', RLock())
			# Make this instance's name effectively constant.  Attribute
			# writes to this will be diverted to _myState, but attribute
			# reads will get this value.
			self.__dict__['task_name'] = task_name
			self._job._tasksCache[task_name] = self

	def delete(self):
		with self.lock:
			self.deleted = True
			with self._job.lock:
				self._job.tasks.pop(task_name, None)
				self._job._tasksCache.pop(task_name, None)
4 Likes