diff options
Diffstat (limited to 'module/lib/beaker/synchronization.py')
-rw-r--r-- | module/lib/beaker/synchronization.py | 123 |
1 files changed, 64 insertions, 59 deletions
diff --git a/module/lib/beaker/synchronization.py b/module/lib/beaker/synchronization.py index 761303707..f236b8cfe 100644 --- a/module/lib/beaker/synchronization.py +++ b/module/lib/beaker/synchronization.py @@ -29,18 +29,18 @@ except: from beaker import util from beaker.exceptions import LockError -__all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer", +__all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer", "NameLock", "_threading"] class NameLock(object): """a proxy for an RLock object that is stored in a name based - registry. - + registry. + Multiple threads can get a reference to the same RLock based on the name alone, and synchronize operations related to that name. - """ + """ locks = util.WeakValuedRegistry() class NLContainer(object): @@ -49,17 +49,18 @@ class NameLock(object): self.lock = _threading.RLock() else: self.lock = _threading.Lock() + def __call__(self): return self.lock - def __init__(self, identifier = None, reentrant = False): + def __init__(self, identifier=None, reentrant=False): if identifier is None: self._lock = NameLock.NLContainer(reentrant) else: self._lock = NameLock.locks.get(identifier, NameLock.NLContainer, reentrant) - def acquire(self, wait = True): + def acquire(self, wait=True): return self._lock().acquire(wait) def release(self): @@ -67,6 +68,8 @@ class NameLock(object): _synchronizers = util.WeakValuedRegistry() + + def _synchronizer(identifier, cls, **kwargs): return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs) @@ -83,12 +86,19 @@ def mutex_synchronizer(identifier, **kwargs): class null_synchronizer(object): + """A 'null' synchronizer, which provides the :class:`.SynchronizerImpl` interface + without any locking. + + """ def acquire_write_lock(self, wait=True): return True + def acquire_read_lock(self): pass + def release_write_lock(self): pass + def release_read_lock(self): pass acquire = acquire_write_lock @@ -96,6 +106,10 @@ class null_synchronizer(object): class SynchronizerImpl(object): + """Base class for a synchronization object that allows + multiple readers, single writers. + + """ def __init__(self): self._state = util.ThreadLocal() @@ -115,27 +129,27 @@ class SynchronizerImpl(object): else: return self._state.get() state = property(state) - + def release_read_lock(self): state = self.state - if state.writing: + if state.writing: raise LockError("lock is in writing state") - if not state.reading: + if not state.reading: raise LockError("lock is not in reading state") - + if state.reentrantcount == 1: self.do_release_read_lock() state.reading = False state.reentrantcount -= 1 - - def acquire_read_lock(self, wait = True): + + def acquire_read_lock(self, wait=True): state = self.state - if state.writing: + if state.writing: raise LockError("lock is in writing state") - + if state.reentrantcount == 0: x = self.do_acquire_read_lock(wait) if (wait or x): @@ -145,13 +159,13 @@ class SynchronizerImpl(object): elif state.reading: state.reentrantcount += 1 return True - + def release_write_lock(self): state = self.state - if state.reading: + if state.reading: raise LockError("lock is in reading state") - if not state.writing: + if not state.writing: raise LockError("lock is not in writing state") if state.reentrantcount == 1: @@ -159,18 +173,18 @@ class SynchronizerImpl(object): state.writing = False state.reentrantcount -= 1 - + release = release_write_lock - - def acquire_write_lock(self, wait = True): + + def acquire_write_lock(self, wait=True): state = self.state - if state.reading: + if state.reading: raise LockError("lock is in reading state") - + if state.reentrantcount == 0: x = self.do_acquire_write_lock(wait) - if (wait or x): + if (wait or x): state.reentrantcount += 1 state.writing = True return x @@ -182,56 +196,47 @@ class SynchronizerImpl(object): def do_release_read_lock(self): raise NotImplementedError() - + def do_acquire_read_lock(self): raise NotImplementedError() - + def do_release_write_lock(self): raise NotImplementedError() - + def do_acquire_write_lock(self): raise NotImplementedError() class FileSynchronizer(SynchronizerImpl): - """a synchronizer which locks using flock(). - - Adapted for Python/multithreads from Apache::Session::Lock::File, - http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm - - This module does not unlink temporary files, - because it interferes with proper locking. This can cause - problems on certain systems (Linux) whose file systems (ext2) do not - perform well with lots of files in one directory. To prevent this - you should use a script to clean out old files from your lock directory. - + """A synchronizer which locks using flock(). + """ def __init__(self, identifier, lock_dir): super(FileSynchronizer, self).__init__() self._filedescriptor = util.ThreadLocal() - + if lock_dir is None: lock_dir = tempfile.gettempdir() else: lock_dir = lock_dir self.filename = util.encoded_path( - lock_dir, - [identifier], + lock_dir, + [identifier], extension='.lock' ) def _filedesc(self): return self._filedescriptor.get() _filedesc = property(_filedesc) - + def _open(self, mode): filedescriptor = self._filedesc if filedescriptor is None: filedescriptor = os.open(self.filename, mode) self._filedescriptor.put(filedescriptor) return filedescriptor - + def do_acquire_read_lock(self, wait): filedescriptor = self._open(os.O_CREAT | os.O_RDONLY) if not wait: @@ -259,13 +264,13 @@ class FileSynchronizer(SynchronizerImpl): else: fcntl.flock(filedescriptor, fcntl.LOCK_EX) return True - + def do_release_read_lock(self): self._release_all_locks() - + def do_release_write_lock(self): self._release_all_locks() - + def _release_all_locks(self): filedescriptor = self._filedesc if filedescriptor is not None: @@ -276,7 +281,7 @@ class FileSynchronizer(SynchronizerImpl): class ConditionSynchronizer(SynchronizerImpl): """a synchronizer using a Condition.""" - + def __init__(self, identifier): super(ConditionSynchronizer, self).__init__() @@ -289,7 +294,7 @@ class ConditionSynchronizer(SynchronizerImpl): # condition object to lock on self.condition = _threading.Condition(_threading.Lock()) - def do_acquire_read_lock(self, wait = True): + def do_acquire_read_lock(self, wait=True): self.condition.acquire() try: # see if a synchronous operation is waiting to start @@ -306,15 +311,15 @@ class ConditionSynchronizer(SynchronizerImpl): finally: self.condition.release() - if not wait: + if not wait: return True - + def do_release_read_lock(self): self.condition.acquire() try: self.async -= 1 - - # check if we are the last asynchronous reader thread + + # check if we are the last asynchronous reader thread # out the door. if self.async == 0: # yes. so if a sync operation is waiting, notifyAll to wake @@ -326,13 +331,13 @@ class ConditionSynchronizer(SynchronizerImpl): "release_read_locks called") finally: self.condition.release() - - def do_acquire_write_lock(self, wait = True): + + def do_acquire_write_lock(self, wait=True): self.condition.acquire() try: # here, we are not a synchronous reader, and after returning, # assuming waiting or immediate availability, we will be. - + if wait: # if another sync is working, wait while self.current_sync_operation is not None: @@ -342,8 +347,8 @@ class ConditionSynchronizer(SynchronizerImpl): # we dont want to wait, so forget it if self.current_sync_operation is not None: return False - - # establish ourselves as the current sync + + # establish ourselves as the current sync # this indicates to other read/write operations # that they should wait until this is None again self.current_sync_operation = _threading.currentThread() @@ -359,8 +364,8 @@ class ConditionSynchronizer(SynchronizerImpl): return False finally: self.condition.release() - - if not wait: + + if not wait: return True def do_release_write_lock(self): @@ -370,7 +375,7 @@ class ConditionSynchronizer(SynchronizerImpl): raise LockError("Synchronizer error - current thread doesnt " "have the write lock") - # reset the current sync operation so + # reset the current sync operation so # another can get it self.current_sync_operation = None |