summaryrefslogtreecommitdiffstats
path: root/module/lib/beaker/synchronization.py
diff options
context:
space:
mode:
Diffstat (limited to 'module/lib/beaker/synchronization.py')
-rw-r--r--module/lib/beaker/synchronization.py123
1 files changed, 64 insertions, 59 deletions
diff --git a/module/lib/beaker/synchronization.py b/module/lib/beaker/synchronization.py
index 761303707..f236b8cfe 100644
--- a/module/lib/beaker/synchronization.py
+++ b/module/lib/beaker/synchronization.py
@@ -29,18 +29,18 @@ except:
from beaker import util
from beaker.exceptions import LockError
-__all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer",
+__all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer",
"NameLock", "_threading"]
class NameLock(object):
"""a proxy for an RLock object that is stored in a name based
- registry.
-
+ registry.
+
Multiple threads can get a reference to the same RLock based on the
name alone, and synchronize operations related to that name.
- """
+ """
locks = util.WeakValuedRegistry()
class NLContainer(object):
@@ -49,17 +49,18 @@ class NameLock(object):
self.lock = _threading.RLock()
else:
self.lock = _threading.Lock()
+
def __call__(self):
return self.lock
- def __init__(self, identifier = None, reentrant = False):
+ def __init__(self, identifier=None, reentrant=False):
if identifier is None:
self._lock = NameLock.NLContainer(reentrant)
else:
self._lock = NameLock.locks.get(identifier, NameLock.NLContainer,
reentrant)
- def acquire(self, wait = True):
+ def acquire(self, wait=True):
return self._lock().acquire(wait)
def release(self):
@@ -67,6 +68,8 @@ class NameLock(object):
_synchronizers = util.WeakValuedRegistry()
+
+
def _synchronizer(identifier, cls, **kwargs):
return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs)
@@ -83,12 +86,19 @@ def mutex_synchronizer(identifier, **kwargs):
class null_synchronizer(object):
+ """A 'null' synchronizer, which provides the :class:`.SynchronizerImpl` interface
+ without any locking.
+
+ """
def acquire_write_lock(self, wait=True):
return True
+
def acquire_read_lock(self):
pass
+
def release_write_lock(self):
pass
+
def release_read_lock(self):
pass
acquire = acquire_write_lock
@@ -96,6 +106,10 @@ class null_synchronizer(object):
class SynchronizerImpl(object):
+ """Base class for a synchronization object that allows
+ multiple readers, single writers.
+
+ """
def __init__(self):
self._state = util.ThreadLocal()
@@ -115,27 +129,27 @@ class SynchronizerImpl(object):
else:
return self._state.get()
state = property(state)
-
+
def release_read_lock(self):
state = self.state
- if state.writing:
+ if state.writing:
raise LockError("lock is in writing state")
- if not state.reading:
+ if not state.reading:
raise LockError("lock is not in reading state")
-
+
if state.reentrantcount == 1:
self.do_release_read_lock()
state.reading = False
state.reentrantcount -= 1
-
- def acquire_read_lock(self, wait = True):
+
+ def acquire_read_lock(self, wait=True):
state = self.state
- if state.writing:
+ if state.writing:
raise LockError("lock is in writing state")
-
+
if state.reentrantcount == 0:
x = self.do_acquire_read_lock(wait)
if (wait or x):
@@ -145,13 +159,13 @@ class SynchronizerImpl(object):
elif state.reading:
state.reentrantcount += 1
return True
-
+
def release_write_lock(self):
state = self.state
- if state.reading:
+ if state.reading:
raise LockError("lock is in reading state")
- if not state.writing:
+ if not state.writing:
raise LockError("lock is not in writing state")
if state.reentrantcount == 1:
@@ -159,18 +173,18 @@ class SynchronizerImpl(object):
state.writing = False
state.reentrantcount -= 1
-
+
release = release_write_lock
-
- def acquire_write_lock(self, wait = True):
+
+ def acquire_write_lock(self, wait=True):
state = self.state
- if state.reading:
+ if state.reading:
raise LockError("lock is in reading state")
-
+
if state.reentrantcount == 0:
x = self.do_acquire_write_lock(wait)
- if (wait or x):
+ if (wait or x):
state.reentrantcount += 1
state.writing = True
return x
@@ -182,56 +196,47 @@ class SynchronizerImpl(object):
def do_release_read_lock(self):
raise NotImplementedError()
-
+
def do_acquire_read_lock(self):
raise NotImplementedError()
-
+
def do_release_write_lock(self):
raise NotImplementedError()
-
+
def do_acquire_write_lock(self):
raise NotImplementedError()
class FileSynchronizer(SynchronizerImpl):
- """a synchronizer which locks using flock().
-
- Adapted for Python/multithreads from Apache::Session::Lock::File,
- http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm
-
- This module does not unlink temporary files,
- because it interferes with proper locking. This can cause
- problems on certain systems (Linux) whose file systems (ext2) do not
- perform well with lots of files in one directory. To prevent this
- you should use a script to clean out old files from your lock directory.
-
+ """A synchronizer which locks using flock().
+
"""
def __init__(self, identifier, lock_dir):
super(FileSynchronizer, self).__init__()
self._filedescriptor = util.ThreadLocal()
-
+
if lock_dir is None:
lock_dir = tempfile.gettempdir()
else:
lock_dir = lock_dir
self.filename = util.encoded_path(
- lock_dir,
- [identifier],
+ lock_dir,
+ [identifier],
extension='.lock'
)
def _filedesc(self):
return self._filedescriptor.get()
_filedesc = property(_filedesc)
-
+
def _open(self, mode):
filedescriptor = self._filedesc
if filedescriptor is None:
filedescriptor = os.open(self.filename, mode)
self._filedescriptor.put(filedescriptor)
return filedescriptor
-
+
def do_acquire_read_lock(self, wait):
filedescriptor = self._open(os.O_CREAT | os.O_RDONLY)
if not wait:
@@ -259,13 +264,13 @@ class FileSynchronizer(SynchronizerImpl):
else:
fcntl.flock(filedescriptor, fcntl.LOCK_EX)
return True
-
+
def do_release_read_lock(self):
self._release_all_locks()
-
+
def do_release_write_lock(self):
self._release_all_locks()
-
+
def _release_all_locks(self):
filedescriptor = self._filedesc
if filedescriptor is not None:
@@ -276,7 +281,7 @@ class FileSynchronizer(SynchronizerImpl):
class ConditionSynchronizer(SynchronizerImpl):
"""a synchronizer using a Condition."""
-
+
def __init__(self, identifier):
super(ConditionSynchronizer, self).__init__()
@@ -289,7 +294,7 @@ class ConditionSynchronizer(SynchronizerImpl):
# condition object to lock on
self.condition = _threading.Condition(_threading.Lock())
- def do_acquire_read_lock(self, wait = True):
+ def do_acquire_read_lock(self, wait=True):
self.condition.acquire()
try:
# see if a synchronous operation is waiting to start
@@ -306,15 +311,15 @@ class ConditionSynchronizer(SynchronizerImpl):
finally:
self.condition.release()
- if not wait:
+ if not wait:
return True
-
+
def do_release_read_lock(self):
self.condition.acquire()
try:
self.async -= 1
-
- # check if we are the last asynchronous reader thread
+
+ # check if we are the last asynchronous reader thread
# out the door.
if self.async == 0:
# yes. so if a sync operation is waiting, notifyAll to wake
@@ -326,13 +331,13 @@ class ConditionSynchronizer(SynchronizerImpl):
"release_read_locks called")
finally:
self.condition.release()
-
- def do_acquire_write_lock(self, wait = True):
+
+ def do_acquire_write_lock(self, wait=True):
self.condition.acquire()
try:
# here, we are not a synchronous reader, and after returning,
# assuming waiting or immediate availability, we will be.
-
+
if wait:
# if another sync is working, wait
while self.current_sync_operation is not None:
@@ -342,8 +347,8 @@ class ConditionSynchronizer(SynchronizerImpl):
# we dont want to wait, so forget it
if self.current_sync_operation is not None:
return False
-
- # establish ourselves as the current sync
+
+ # establish ourselves as the current sync
# this indicates to other read/write operations
# that they should wait until this is None again
self.current_sync_operation = _threading.currentThread()
@@ -359,8 +364,8 @@ class ConditionSynchronizer(SynchronizerImpl):
return False
finally:
self.condition.release()
-
- if not wait:
+
+ if not wait:
return True
def do_release_write_lock(self):
@@ -370,7 +375,7 @@ class ConditionSynchronizer(SynchronizerImpl):
raise LockError("Synchronizer error - current thread doesnt "
"have the write lock")
- # reset the current sync operation so
+ # reset the current sync operation so
# another can get it
self.current_sync_operation = None