summaryrefslogtreecommitdiffstats
path: root/pyload/lib/beaker/synchronization.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyload/lib/beaker/synchronization.py')
-rw-r--r--pyload/lib/beaker/synchronization.py381
1 files changed, 381 insertions, 0 deletions
diff --git a/pyload/lib/beaker/synchronization.py b/pyload/lib/beaker/synchronization.py
new file mode 100644
index 000000000..761303707
--- /dev/null
+++ b/pyload/lib/beaker/synchronization.py
@@ -0,0 +1,381 @@
+"""Synchronization functions.
+
+File- and mutex-based mutual exclusion synchronizers are provided,
+as well as a name-based mutex which locks within an application
+based on a string name.
+
+"""
+
+import os
+import sys
+import tempfile
+
+try:
+ import threading as _threading
+except ImportError:
+ import dummy_threading as _threading
+
+# check for fcntl module
+try:
+ sys.getwindowsversion()
+ has_flock = False
+except:
+ try:
+ import fcntl
+ has_flock = True
+ except ImportError:
+ has_flock = False
+
+from beaker import util
+from beaker.exceptions import LockError
+
+__all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer",
+ "NameLock", "_threading"]
+
+
+class NameLock(object):
+ """a proxy for an RLock object that is stored in a name based
+ registry.
+
+ Multiple threads can get a reference to the same RLock based on the
+ name alone, and synchronize operations related to that name.
+
+ """
+ locks = util.WeakValuedRegistry()
+
+ class NLContainer(object):
+ def __init__(self, reentrant):
+ if reentrant:
+ self.lock = _threading.RLock()
+ else:
+ self.lock = _threading.Lock()
+ def __call__(self):
+ return self.lock
+
+ def __init__(self, identifier = None, reentrant = False):
+ if identifier is None:
+ self._lock = NameLock.NLContainer(reentrant)
+ else:
+ self._lock = NameLock.locks.get(identifier, NameLock.NLContainer,
+ reentrant)
+
+ def acquire(self, wait = True):
+ return self._lock().acquire(wait)
+
+ def release(self):
+ self._lock().release()
+
+
+_synchronizers = util.WeakValuedRegistry()
+def _synchronizer(identifier, cls, **kwargs):
+ return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs)
+
+
+def file_synchronizer(identifier, **kwargs):
+ if not has_flock or 'lock_dir' not in kwargs:
+ return mutex_synchronizer(identifier)
+ else:
+ return _synchronizer(identifier, FileSynchronizer, **kwargs)
+
+
+def mutex_synchronizer(identifier, **kwargs):
+ return _synchronizer(identifier, ConditionSynchronizer, **kwargs)
+
+
+class null_synchronizer(object):
+ def acquire_write_lock(self, wait=True):
+ return True
+ def acquire_read_lock(self):
+ pass
+ def release_write_lock(self):
+ pass
+ def release_read_lock(self):
+ pass
+ acquire = acquire_write_lock
+ release = release_write_lock
+
+
+class SynchronizerImpl(object):
+ def __init__(self):
+ self._state = util.ThreadLocal()
+
+ class SyncState(object):
+ __slots__ = 'reentrantcount', 'writing', 'reading'
+
+ def __init__(self):
+ self.reentrantcount = 0
+ self.writing = False
+ self.reading = False
+
+ def state(self):
+ if not self._state.has():
+ state = SynchronizerImpl.SyncState()
+ self._state.put(state)
+ return state
+ else:
+ return self._state.get()
+ state = property(state)
+
+ def release_read_lock(self):
+ state = self.state
+
+ if state.writing:
+ raise LockError("lock is in writing state")
+ if not state.reading:
+ raise LockError("lock is not in reading state")
+
+ if state.reentrantcount == 1:
+ self.do_release_read_lock()
+ state.reading = False
+
+ state.reentrantcount -= 1
+
+ def acquire_read_lock(self, wait = True):
+ state = self.state
+
+ if state.writing:
+ raise LockError("lock is in writing state")
+
+ if state.reentrantcount == 0:
+ x = self.do_acquire_read_lock(wait)
+ if (wait or x):
+ state.reentrantcount += 1
+ state.reading = True
+ return x
+ elif state.reading:
+ state.reentrantcount += 1
+ return True
+
+ def release_write_lock(self):
+ state = self.state
+
+ if state.reading:
+ raise LockError("lock is in reading state")
+ if not state.writing:
+ raise LockError("lock is not in writing state")
+
+ if state.reentrantcount == 1:
+ self.do_release_write_lock()
+ state.writing = False
+
+ state.reentrantcount -= 1
+
+ release = release_write_lock
+
+ def acquire_write_lock(self, wait = True):
+ state = self.state
+
+ if state.reading:
+ raise LockError("lock is in reading state")
+
+ if state.reentrantcount == 0:
+ x = self.do_acquire_write_lock(wait)
+ if (wait or x):
+ state.reentrantcount += 1
+ state.writing = True
+ return x
+ elif state.writing:
+ state.reentrantcount += 1
+ return True
+
+ acquire = acquire_write_lock
+
+ def do_release_read_lock(self):
+ raise NotImplementedError()
+
+ def do_acquire_read_lock(self):
+ raise NotImplementedError()
+
+ def do_release_write_lock(self):
+ raise NotImplementedError()
+
+ def do_acquire_write_lock(self):
+ raise NotImplementedError()
+
+
+class FileSynchronizer(SynchronizerImpl):
+ """a synchronizer which locks using flock().
+
+ Adapted for Python/multithreads from Apache::Session::Lock::File,
+ http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm
+
+ This module does not unlink temporary files,
+ because it interferes with proper locking. This can cause
+ problems on certain systems (Linux) whose file systems (ext2) do not
+ perform well with lots of files in one directory. To prevent this
+ you should use a script to clean out old files from your lock directory.
+
+ """
+ def __init__(self, identifier, lock_dir):
+ super(FileSynchronizer, self).__init__()
+ self._filedescriptor = util.ThreadLocal()
+
+ if lock_dir is None:
+ lock_dir = tempfile.gettempdir()
+ else:
+ lock_dir = lock_dir
+
+ self.filename = util.encoded_path(
+ lock_dir,
+ [identifier],
+ extension='.lock'
+ )
+
+ def _filedesc(self):
+ return self._filedescriptor.get()
+ _filedesc = property(_filedesc)
+
+ def _open(self, mode):
+ filedescriptor = self._filedesc
+ if filedescriptor is None:
+ filedescriptor = os.open(self.filename, mode)
+ self._filedescriptor.put(filedescriptor)
+ return filedescriptor
+
+ def do_acquire_read_lock(self, wait):
+ filedescriptor = self._open(os.O_CREAT | os.O_RDONLY)
+ if not wait:
+ try:
+ fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB)
+ return True
+ except IOError:
+ os.close(filedescriptor)
+ self._filedescriptor.remove()
+ return False
+ else:
+ fcntl.flock(filedescriptor, fcntl.LOCK_SH)
+ return True
+
+ def do_acquire_write_lock(self, wait):
+ filedescriptor = self._open(os.O_CREAT | os.O_WRONLY)
+ if not wait:
+ try:
+ fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ return True
+ except IOError:
+ os.close(filedescriptor)
+ self._filedescriptor.remove()
+ return False
+ else:
+ fcntl.flock(filedescriptor, fcntl.LOCK_EX)
+ return True
+
+ def do_release_read_lock(self):
+ self._release_all_locks()
+
+ def do_release_write_lock(self):
+ self._release_all_locks()
+
+ def _release_all_locks(self):
+ filedescriptor = self._filedesc
+ if filedescriptor is not None:
+ fcntl.flock(filedescriptor, fcntl.LOCK_UN)
+ os.close(filedescriptor)
+ self._filedescriptor.remove()
+
+
+class ConditionSynchronizer(SynchronizerImpl):
+ """a synchronizer using a Condition."""
+
+ def __init__(self, identifier):
+ super(ConditionSynchronizer, self).__init__()
+
+ # counts how many asynchronous methods are executing
+ self.async = 0
+
+ # pointer to thread that is the current sync operation
+ self.current_sync_operation = None
+
+ # condition object to lock on
+ self.condition = _threading.Condition(_threading.Lock())
+
+ def do_acquire_read_lock(self, wait = True):
+ self.condition.acquire()
+ try:
+ # see if a synchronous operation is waiting to start
+ # or is already running, in which case we wait (or just
+ # give up and return)
+ if wait:
+ while self.current_sync_operation is not None:
+ self.condition.wait()
+ else:
+ if self.current_sync_operation is not None:
+ return False
+
+ self.async += 1
+ finally:
+ self.condition.release()
+
+ if not wait:
+ return True
+
+ def do_release_read_lock(self):
+ self.condition.acquire()
+ try:
+ self.async -= 1
+
+ # check if we are the last asynchronous reader thread
+ # out the door.
+ if self.async == 0:
+ # yes. so if a sync operation is waiting, notifyAll to wake
+ # it up
+ if self.current_sync_operation is not None:
+ self.condition.notifyAll()
+ elif self.async < 0:
+ raise LockError("Synchronizer error - too many "
+ "release_read_locks called")
+ finally:
+ self.condition.release()
+
+ def do_acquire_write_lock(self, wait = True):
+ self.condition.acquire()
+ try:
+ # here, we are not a synchronous reader, and after returning,
+ # assuming waiting or immediate availability, we will be.
+
+ if wait:
+ # if another sync is working, wait
+ while self.current_sync_operation is not None:
+ self.condition.wait()
+ else:
+ # if another sync is working,
+ # we dont want to wait, so forget it
+ if self.current_sync_operation is not None:
+ return False
+
+ # establish ourselves as the current sync
+ # this indicates to other read/write operations
+ # that they should wait until this is None again
+ self.current_sync_operation = _threading.currentThread()
+
+ # now wait again for asyncs to finish
+ if self.async > 0:
+ if wait:
+ # wait
+ self.condition.wait()
+ else:
+ # we dont want to wait, so forget it
+ self.current_sync_operation = None
+ return False
+ finally:
+ self.condition.release()
+
+ if not wait:
+ return True
+
+ def do_release_write_lock(self):
+ self.condition.acquire()
+ try:
+ if self.current_sync_operation is not _threading.currentThread():
+ raise LockError("Synchronizer error - current thread doesnt "
+ "have the write lock")
+
+ # reset the current sync operation so
+ # another can get it
+ self.current_sync_operation = None
+
+ # tell everyone to get ready
+ self.condition.notifyAll()
+ finally:
+ # everyone go !!
+ self.condition.release()