So, I was nerd-sniped pretty intensely by this yesterday, as the nesting of persistent dictionaries has some performance considerations that suggest some instance caching is appropriate.
This is utterly untested, but should give you a framework to use if you decide you really don't want the latency of database storage (and I wouldn't use document tags):
from threading import RLock
_g = system.util.getGlobals()
# This class requires a dictionary in its constructor to
# carry state. It delegates attribute assignment and retrieval
# to corresponding keys in the state dictionary. For compatibility
# with persistent object storage (like _g above), it is vital that
# only native Jython or Java object instances be stored in
# the state dictionary.
#
# Attributes that begin with an underscore are not diverted
# to the state dictionary.
#
class DictWrapper(object):
def __init__(self, persistenceDictionary):
self._myState = persistenceDictionary
# Note that __getattr__ is only called when the required name
# doesn't exist *as an attribute*.
def __getattr__(self, attr):
try:
return self._myState[attr]
except KeyError:
raise AttributeError(attr)
# Note that __setattr__ is *always* called for attribute
# assignment and must explicitly defer to the superclass
# for attributes that must not be diverted.
def __setattr__(self, attr, val):
if attr.startswith('_'):
super(DictWrapper, self).__setattr__(attr, val)
else:
self._myState[attr] = val
# Implement a job-with-nested-tasks class architecture using persistent
# object storage. Jobs and tasks within jobs are *named* for maximum
# concurrency. Each instance is initialized with .deleted set False
# so that deletions in parallel contexts are noticeable. (Any instance
# that sets .deleted is expected to then delete from the persistent
# dictionary. Parallel instances will notice the deleted status upon
# cache lookup and prune at that point.)
jobStates = _g.setdefault('JobStatesByName', {})
# A job instance wraps a state dictionary that is maintained within
# _g so that all jython contexts will share state. The class itself
# maintains a cache of all wrapped instances.
#
# Generally, job instance algorithms would use `with self.lock:` for
# critical sections where thread safety matters.
class JobTracker(DictWrapper):
_trackers = {}
_cacheLock = RLock()
# Users should never call this. Unconditional instance creation
# must use the static method .lookupOrMake() to avoid duplicate
# cache entries, and to prune deletions. Retrieve existing
# instances with .lookup() to ensure deletions are pruned.
def __init__(self, job_name):
with JobTracker._cacheLock:
super(JobTracker, self).__init__(jobStates.setdefault(job_name, {}))
self._myState.setdefault('tasks', {})
self._myState.setdefault('lock', RLock())
self._myState.setdefault('deleted', False)
self._tasksCache = {}
# Make this instance's name effectively constant. Attribute
# writes to job_name will be diverted to _myState, but attribute
# reads will get this value.
self.__dict__['job_name'] = job_name
JobTracker._trackers[job_name] = self
# An instance's delete method handles the persistence details.
def delete(self):
with self.lock:
self.deleted = True
with JobTracker._cacheLock:
jobStates.pop(job_name, None)
JobTracker._trackers.pop(job_name, None)
@staticmethod
def _cachedJob(job_name):
# Fast path is unlocked.
job = JobTracker._trackers[job_name]
if job.deleted:
with JobTracker._cacheLock:
# Repeat the lookup under the lock
job = JobTracker._trackers[job_name]
if job.deleted:
JobTracker._trackers.pop(job_name, None)
throw KeyError(job_name)
return job
@staticmethod
def lookup(job_name):
# Fast path is unlocked.
try:
return JobTracker._cachedJob(job_name)
except KeyError:
# Try again under the cache lock
with JobTracker._cacheLock:
try:
return JobTracker._cachedJob(job_name)
except KeyError:
# This will give another key error if it really doesn't exist
jobPersistence = jobStates[job_name]
# If no error, instantiate.
return JobTracker(job_name)
@staticmethod
def lookupOrMake(job_name):
# Fast path is unlocked.
try:
return JobTracker._cachedJob(job_name)
except KeyError:
# Try again under the cache lock
with JobTracker._cacheLock:
try:
return JobTracker._cachedJob(job_name)
except KeyError:
# Unconditionally make an instance.
return JobTracker(job_name)
# Manage a cache of nested tasks' instances, similar to the cache of
# job instances.
def _cachedTask(self, task_name):
# Fast path is unlocked.
task = self._tasksCache[task_name]
if task.deleted:
with self.lock:
# Repeat the lookup under the lock
task = self._tasksCache[task_name]
if task.deleted:
self._tasksCache.pop(task_name, None)
throw KeyError(task_name)
return task
def getTask(self, task_name):
# Fast path is unlocked.
try:
return self._cachedTask(task_name)
except KeyError:
# Try again under the job lock
with self.lock:
try:
return self._cachedTask(task_name)
except KeyError:
# This will give a final key error if it really doesn't exist
taskPersistence = self.tasks[task_name]
# Wrap it in the Task class
return TaskTracker(self.job_name, task_name)
def getOrMakeTask(self, task_name):
# Fast path is unlocked.
try:
return self._cachedTask(task_name)
except KeyError:
# Try again under the job lock
with self.lock:
try:
return self._cachedTask(task_name)
except KeyError:
# Unconditionally make a Task instance
return TaskTracker(self.job_name, task_name)
# Provide a helper for iteration over a job's tasks that
# supplies the task instances instead of the task's persistence
# dictionary, pruning automatically.
@property
def _tasks(self):
pruneKeys = set(self._tasksCache.keys())
for k, v in self.tasks.items():
pruneKeys.discard(k)
try:
yield self.getTask(k)
except KeyError:
# Only here when a task is pruned concurrently with
# this generator loop.
pass
for k in pruneKeys:
try:
self._cachedTask(k)
except KeyError:
pass
# A task instance wraps a state dictionary that is maintained within
# the job's state dictionary so that all jython contexts will share state.
# The job instance maintains a cache of all wrapped task instances.
#
# Task lookup and creation must be performed by the job to maintain
# the cache and perform deletion pruning.
#
# Generally, task instance algorithms would use `with self.lock:` for
# critical sections where thread safety matters.
class TaskTracker(DictWrapper):
def __init__(self, job_name, task_name):
# The following will fail if the job doesn't exist.
self._job = JobTracker.lookup(job_name)
with self._job.lock:
super(TaskTracker, self).__init__(self._job.tasks.setdefault(task_name, {}))
self._myState.setdefault('deleted', False)
self._myState.setdefault('lock', RLock())
# Make this instance's name effectively constant. Attribute
# writes to this will be diverted to _myState, but attribute
# reads will get this value.
self.__dict__['task_name'] = task_name
self._job._tasksCache[task_name] = self
def delete(self):
with self.lock:
self.deleted = True
with self._job.lock:
self._job.tasks.pop(task_name, None)
self._job._tasksCache.pop(task_name, None)