From 7e974f306adb82dd38cc4adc4adbe624619c6f1a Mon Sep 17 00:00:00 2001
From: root <root@raspberrypi.(none)>
Date: Fri, 3 Apr 2015 16:41:45 +0200
Subject: add lib

---
 lib/beaker/synchronization.py | 381 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 381 insertions(+)
 create mode 100644 lib/beaker/synchronization.py

(limited to 'lib/beaker/synchronization.py')

diff --git a/lib/beaker/synchronization.py b/lib/beaker/synchronization.py
new file mode 100644
index 000000000..761303707
--- /dev/null
+++ b/lib/beaker/synchronization.py
@@ -0,0 +1,381 @@
+"""Synchronization functions.
+
+File- and mutex-based mutual exclusion synchronizers are provided,
+as well as a name-based mutex which locks within an application
+based on a string name.
+
+"""
+
+import os
+import sys
+import tempfile
+
+try:
+    import threading as _threading
+except ImportError:
+    import dummy_threading as _threading
+
+# check for fcntl module
+try:
+    sys.getwindowsversion()
+    has_flock = False
+except:
+    try:
+        import fcntl
+        has_flock = True
+    except ImportError:
+        has_flock = False
+
+from beaker import util
+from beaker.exceptions import LockError
+
+__all__  = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer",
+            "NameLock", "_threading"]
+
+
+class NameLock(object):
+    """a proxy for an RLock object that is stored in a name based
+    registry.  
+    
+    Multiple threads can get a reference to the same RLock based on the
+    name alone, and synchronize operations related to that name.
+
+    """     
+    locks = util.WeakValuedRegistry()
+
+    class NLContainer(object):
+        def __init__(self, reentrant):
+            if reentrant:
+                self.lock = _threading.RLock()
+            else:
+                self.lock = _threading.Lock()
+        def __call__(self):
+            return self.lock
+
+    def __init__(self, identifier = None, reentrant = False):
+        if identifier is None:
+            self._lock = NameLock.NLContainer(reentrant)
+        else:
+            self._lock = NameLock.locks.get(identifier, NameLock.NLContainer,
+                                            reentrant)
+
+    def acquire(self, wait = True):
+        return self._lock().acquire(wait)
+
+    def release(self):
+        self._lock().release()
+
+
+_synchronizers = util.WeakValuedRegistry()
+def _synchronizer(identifier, cls, **kwargs):
+    return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs)
+
+
+def file_synchronizer(identifier, **kwargs):
+    if not has_flock or 'lock_dir' not in kwargs:
+        return mutex_synchronizer(identifier)
+    else:
+        return _synchronizer(identifier, FileSynchronizer, **kwargs)
+
+
+def mutex_synchronizer(identifier, **kwargs):
+    return _synchronizer(identifier, ConditionSynchronizer, **kwargs)
+
+
+class null_synchronizer(object):
+    def acquire_write_lock(self, wait=True):
+        return True
+    def acquire_read_lock(self):
+        pass
+    def release_write_lock(self):
+        pass
+    def release_read_lock(self):
+        pass
+    acquire = acquire_write_lock
+    release = release_write_lock
+
+
+class SynchronizerImpl(object):
+    def __init__(self):
+        self._state = util.ThreadLocal()
+
+    class SyncState(object):
+        __slots__ = 'reentrantcount', 'writing', 'reading'
+
+        def __init__(self):
+            self.reentrantcount = 0
+            self.writing = False
+            self.reading = False
+
+    def state(self):
+        if not self._state.has():
+            state = SynchronizerImpl.SyncState()
+            self._state.put(state)
+            return state
+        else:
+            return self._state.get()
+    state = property(state)
+    
+    def release_read_lock(self):
+        state = self.state
+
+        if state.writing: 
+            raise LockError("lock is in writing state")
+        if not state.reading: 
+            raise LockError("lock is not in reading state")
+        
+        if state.reentrantcount == 1:
+            self.do_release_read_lock()
+            state.reading = False
+
+        state.reentrantcount -= 1
+        
+    def acquire_read_lock(self, wait = True):
+        state = self.state
+
+        if state.writing: 
+            raise LockError("lock is in writing state")
+        
+        if state.reentrantcount == 0:
+            x = self.do_acquire_read_lock(wait)
+            if (wait or x):
+                state.reentrantcount += 1
+                state.reading = True
+            return x
+        elif state.reading:
+            state.reentrantcount += 1
+            return True
+            
+    def release_write_lock(self):
+        state = self.state
+
+        if state.reading: 
+            raise LockError("lock is in reading state")
+        if not state.writing: 
+            raise LockError("lock is not in writing state")
+
+        if state.reentrantcount == 1:
+            self.do_release_write_lock()
+            state.writing = False
+
+        state.reentrantcount -= 1
+    
+    release = release_write_lock
+    
+    def acquire_write_lock(self, wait  = True):
+        state = self.state
+
+        if state.reading: 
+            raise LockError("lock is in reading state")
+        
+        if state.reentrantcount == 0:
+            x = self.do_acquire_write_lock(wait)
+            if (wait or x): 
+                state.reentrantcount += 1
+                state.writing = True
+            return x
+        elif state.writing:
+            state.reentrantcount += 1
+            return True
+
+    acquire = acquire_write_lock
+
+    def do_release_read_lock(self):
+        raise NotImplementedError()
+    
+    def do_acquire_read_lock(self):
+        raise NotImplementedError()
+    
+    def do_release_write_lock(self):
+        raise NotImplementedError()
+    
+    def do_acquire_write_lock(self):
+        raise NotImplementedError()
+
+
+class FileSynchronizer(SynchronizerImpl):
+    """a synchronizer which locks using flock().
+
+    Adapted for Python/multithreads from Apache::Session::Lock::File,
+    http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm
+    
+    This module does not unlink temporary files, 
+    because it interferes with proper locking.  This can cause 
+    problems on certain systems (Linux) whose file systems (ext2) do not 
+    perform well with lots of files in one directory.  To prevent this
+    you should use a script to clean out old files from your lock directory.
+    
+    """
+    def __init__(self, identifier, lock_dir):
+        super(FileSynchronizer, self).__init__()
+        self._filedescriptor = util.ThreadLocal()
+        
+        if lock_dir is None:
+            lock_dir = tempfile.gettempdir()
+        else:
+            lock_dir = lock_dir
+
+        self.filename = util.encoded_path(
+                            lock_dir, 
+                            [identifier], 
+                            extension='.lock'
+                        )
+
+    def _filedesc(self):
+        return self._filedescriptor.get()
+    _filedesc = property(_filedesc)
+        
+    def _open(self, mode):
+        filedescriptor = self._filedesc
+        if filedescriptor is None:
+            filedescriptor = os.open(self.filename, mode)
+            self._filedescriptor.put(filedescriptor)
+        return filedescriptor
+            
+    def do_acquire_read_lock(self, wait):
+        filedescriptor = self._open(os.O_CREAT | os.O_RDONLY)
+        if not wait:
+            try:
+                fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB)
+                return True
+            except IOError:
+                os.close(filedescriptor)
+                self._filedescriptor.remove()
+                return False
+        else:
+            fcntl.flock(filedescriptor, fcntl.LOCK_SH)
+            return True
+
+    def do_acquire_write_lock(self, wait):
+        filedescriptor = self._open(os.O_CREAT | os.O_WRONLY)
+        if not wait:
+            try:
+                fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB)
+                return True
+            except IOError:
+                os.close(filedescriptor)
+                self._filedescriptor.remove()
+                return False
+        else:
+            fcntl.flock(filedescriptor, fcntl.LOCK_EX)
+            return True
+    
+    def do_release_read_lock(self):
+        self._release_all_locks()
+    
+    def do_release_write_lock(self):
+        self._release_all_locks()
+    
+    def _release_all_locks(self):
+        filedescriptor = self._filedesc
+        if filedescriptor is not None:
+            fcntl.flock(filedescriptor, fcntl.LOCK_UN)
+            os.close(filedescriptor)
+            self._filedescriptor.remove()
+
+
+class ConditionSynchronizer(SynchronizerImpl):
+    """a synchronizer using a Condition."""
+    
+    def __init__(self, identifier):
+        super(ConditionSynchronizer, self).__init__()
+
+        # counts how many asynchronous methods are executing
+        self.async = 0
+
+        # pointer to thread that is the current sync operation
+        self.current_sync_operation = None
+
+        # condition object to lock on
+        self.condition = _threading.Condition(_threading.Lock())
+
+    def do_acquire_read_lock(self, wait = True):    
+        self.condition.acquire()
+        try:
+            # see if a synchronous operation is waiting to start
+            # or is already running, in which case we wait (or just
+            # give up and return)
+            if wait:
+                while self.current_sync_operation is not None:
+                    self.condition.wait()
+            else:
+                if self.current_sync_operation is not None:
+                    return False
+
+            self.async += 1
+        finally:
+            self.condition.release()
+
+        if not wait: 
+            return True
+        
+    def do_release_read_lock(self):
+        self.condition.acquire()
+        try:
+            self.async -= 1
+        
+            # check if we are the last asynchronous reader thread 
+            # out the door.
+            if self.async == 0:
+                # yes. so if a sync operation is waiting, notifyAll to wake
+                # it up
+                if self.current_sync_operation is not None:
+                    self.condition.notifyAll()
+            elif self.async < 0:
+                raise LockError("Synchronizer error - too many "
+                                "release_read_locks called")
+        finally:
+            self.condition.release()
+    
+    def do_acquire_write_lock(self, wait = True):
+        self.condition.acquire()
+        try:
+            # here, we are not a synchronous reader, and after returning,
+            # assuming waiting or immediate availability, we will be.
+        
+            if wait:
+                # if another sync is working, wait
+                while self.current_sync_operation is not None:
+                    self.condition.wait()
+            else:
+                # if another sync is working,
+                # we dont want to wait, so forget it
+                if self.current_sync_operation is not None:
+                    return False
+            
+            # establish ourselves as the current sync 
+            # this indicates to other read/write operations
+            # that they should wait until this is None again
+            self.current_sync_operation = _threading.currentThread()
+
+            # now wait again for asyncs to finish
+            if self.async > 0:
+                if wait:
+                    # wait
+                    self.condition.wait()
+                else:
+                    # we dont want to wait, so forget it
+                    self.current_sync_operation = None
+                    return False
+        finally:
+            self.condition.release()
+        
+        if not wait: 
+            return True
+
+    def do_release_write_lock(self):
+        self.condition.acquire()
+        try:
+            if self.current_sync_operation is not _threading.currentThread():
+                raise LockError("Synchronizer error - current thread doesnt "
+                                "have the write lock")
+
+            # reset the current sync operation so 
+            # another can get it
+            self.current_sync_operation = None
+
+            # tell everyone to get ready
+            self.condition.notifyAll()
+        finally:
+            # everyone go !!
+            self.condition.release()
-- 
cgit v1.2.3