diff options
Diffstat (limited to 'module/lib/beaker/synchronization.py')
| -rw-r--r-- | module/lib/beaker/synchronization.py | 381 | 
1 files changed, 0 insertions, 381 deletions
| diff --git a/module/lib/beaker/synchronization.py b/module/lib/beaker/synchronization.py deleted file mode 100644 index 761303707..000000000 --- a/module/lib/beaker/synchronization.py +++ /dev/null @@ -1,381 +0,0 @@ -"""Synchronization functions. - -File- and mutex-based mutual exclusion synchronizers are provided, -as well as a name-based mutex which locks within an application -based on a string name. - -""" - -import os -import sys -import tempfile - -try: -    import threading as _threading -except ImportError: -    import dummy_threading as _threading - -# check for fcntl module -try: -    sys.getwindowsversion() -    has_flock = False -except: -    try: -        import fcntl -        has_flock = True -    except ImportError: -        has_flock = False - -from beaker import util -from beaker.exceptions import LockError - -__all__  = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer", -            "NameLock", "_threading"] - - -class NameLock(object): -    """a proxy for an RLock object that is stored in a name based -    registry.   -     -    Multiple threads can get a reference to the same RLock based on the -    name alone, and synchronize operations related to that name. - -    """      -    locks = util.WeakValuedRegistry() - -    class NLContainer(object): -        def __init__(self, reentrant): -            if reentrant: -                self.lock = _threading.RLock() -            else: -                self.lock = _threading.Lock() -        def __call__(self): -            return self.lock - -    def __init__(self, identifier = None, reentrant = False): -        if identifier is None: -            self._lock = NameLock.NLContainer(reentrant) -        else: -            self._lock = NameLock.locks.get(identifier, NameLock.NLContainer, -                                            reentrant) - -    def acquire(self, wait = True): -        return self._lock().acquire(wait) - -    def release(self): -        self._lock().release() - - -_synchronizers = util.WeakValuedRegistry() -def _synchronizer(identifier, cls, **kwargs): -    return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs) - - -def file_synchronizer(identifier, **kwargs): -    if not has_flock or 'lock_dir' not in kwargs: -        return mutex_synchronizer(identifier) -    else: -        return _synchronizer(identifier, FileSynchronizer, **kwargs) - - -def mutex_synchronizer(identifier, **kwargs): -    return _synchronizer(identifier, ConditionSynchronizer, **kwargs) - - -class null_synchronizer(object): -    def acquire_write_lock(self, wait=True): -        return True -    def acquire_read_lock(self): -        pass -    def release_write_lock(self): -        pass -    def release_read_lock(self): -        pass -    acquire = acquire_write_lock -    release = release_write_lock - - -class SynchronizerImpl(object): -    def __init__(self): -        self._state = util.ThreadLocal() - -    class SyncState(object): -        __slots__ = 'reentrantcount', 'writing', 'reading' - -        def __init__(self): -            self.reentrantcount = 0 -            self.writing = False -            self.reading = False - -    def state(self): -        if not self._state.has(): -            state = SynchronizerImpl.SyncState() -            self._state.put(state) -            return state -        else: -            return self._state.get() -    state = property(state) -     -    def release_read_lock(self): -        state = self.state - -        if state.writing:  -            raise LockError("lock is in writing state") -        if not state.reading:  -            raise LockError("lock is not in reading state") -         -        if state.reentrantcount == 1: -            self.do_release_read_lock() -            state.reading = False - -        state.reentrantcount -= 1 -         -    def acquire_read_lock(self, wait = True): -        state = self.state - -        if state.writing:  -            raise LockError("lock is in writing state") -         -        if state.reentrantcount == 0: -            x = self.do_acquire_read_lock(wait) -            if (wait or x): -                state.reentrantcount += 1 -                state.reading = True -            return x -        elif state.reading: -            state.reentrantcount += 1 -            return True -             -    def release_write_lock(self): -        state = self.state - -        if state.reading:  -            raise LockError("lock is in reading state") -        if not state.writing:  -            raise LockError("lock is not in writing state") - -        if state.reentrantcount == 1: -            self.do_release_write_lock() -            state.writing = False - -        state.reentrantcount -= 1 -     -    release = release_write_lock -     -    def acquire_write_lock(self, wait  = True): -        state = self.state - -        if state.reading:  -            raise LockError("lock is in reading state") -         -        if state.reentrantcount == 0: -            x = self.do_acquire_write_lock(wait) -            if (wait or x):  -                state.reentrantcount += 1 -                state.writing = True -            return x -        elif state.writing: -            state.reentrantcount += 1 -            return True - -    acquire = acquire_write_lock - -    def do_release_read_lock(self): -        raise NotImplementedError() -     -    def do_acquire_read_lock(self): -        raise NotImplementedError() -     -    def do_release_write_lock(self): -        raise NotImplementedError() -     -    def do_acquire_write_lock(self): -        raise NotImplementedError() - - -class FileSynchronizer(SynchronizerImpl): -    """a synchronizer which locks using flock(). - -    Adapted for Python/multithreads from Apache::Session::Lock::File, -    http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm -     -    This module does not unlink temporary files,  -    because it interferes with proper locking.  This can cause  -    problems on certain systems (Linux) whose file systems (ext2) do not  -    perform well with lots of files in one directory.  To prevent this -    you should use a script to clean out old files from your lock directory. -     -    """ -    def __init__(self, identifier, lock_dir): -        super(FileSynchronizer, self).__init__() -        self._filedescriptor = util.ThreadLocal() -         -        if lock_dir is None: -            lock_dir = tempfile.gettempdir() -        else: -            lock_dir = lock_dir - -        self.filename = util.encoded_path( -                            lock_dir,  -                            [identifier],  -                            extension='.lock' -                        ) - -    def _filedesc(self): -        return self._filedescriptor.get() -    _filedesc = property(_filedesc) -         -    def _open(self, mode): -        filedescriptor = self._filedesc -        if filedescriptor is None: -            filedescriptor = os.open(self.filename, mode) -            self._filedescriptor.put(filedescriptor) -        return filedescriptor -             -    def do_acquire_read_lock(self, wait): -        filedescriptor = self._open(os.O_CREAT | os.O_RDONLY) -        if not wait: -            try: -                fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB) -                return True -            except IOError: -                os.close(filedescriptor) -                self._filedescriptor.remove() -                return False -        else: -            fcntl.flock(filedescriptor, fcntl.LOCK_SH) -            return True - -    def do_acquire_write_lock(self, wait): -        filedescriptor = self._open(os.O_CREAT | os.O_WRONLY) -        if not wait: -            try: -                fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB) -                return True -            except IOError: -                os.close(filedescriptor) -                self._filedescriptor.remove() -                return False -        else: -            fcntl.flock(filedescriptor, fcntl.LOCK_EX) -            return True -     -    def do_release_read_lock(self): -        self._release_all_locks() -     -    def do_release_write_lock(self): -        self._release_all_locks() -     -    def _release_all_locks(self): -        filedescriptor = self._filedesc -        if filedescriptor is not None: -            fcntl.flock(filedescriptor, fcntl.LOCK_UN) -            os.close(filedescriptor) -            self._filedescriptor.remove() - - -class ConditionSynchronizer(SynchronizerImpl): -    """a synchronizer using a Condition.""" -     -    def __init__(self, identifier): -        super(ConditionSynchronizer, self).__init__() - -        # counts how many asynchronous methods are executing -        self.async = 0 - -        # pointer to thread that is the current sync operation -        self.current_sync_operation = None - -        # condition object to lock on -        self.condition = _threading.Condition(_threading.Lock()) - -    def do_acquire_read_lock(self, wait = True):     -        self.condition.acquire() -        try: -            # see if a synchronous operation is waiting to start -            # or is already running, in which case we wait (or just -            # give up and return) -            if wait: -                while self.current_sync_operation is not None: -                    self.condition.wait() -            else: -                if self.current_sync_operation is not None: -                    return False - -            self.async += 1 -        finally: -            self.condition.release() - -        if not wait:  -            return True -         -    def do_release_read_lock(self): -        self.condition.acquire() -        try: -            self.async -= 1 -         -            # check if we are the last asynchronous reader thread  -            # out the door. -            if self.async == 0: -                # yes. so if a sync operation is waiting, notifyAll to wake -                # it up -                if self.current_sync_operation is not None: -                    self.condition.notifyAll() -            elif self.async < 0: -                raise LockError("Synchronizer error - too many " -                                "release_read_locks called") -        finally: -            self.condition.release() -     -    def do_acquire_write_lock(self, wait = True): -        self.condition.acquire() -        try: -            # here, we are not a synchronous reader, and after returning, -            # assuming waiting or immediate availability, we will be. -         -            if wait: -                # if another sync is working, wait -                while self.current_sync_operation is not None: -                    self.condition.wait() -            else: -                # if another sync is working, -                # we dont want to wait, so forget it -                if self.current_sync_operation is not None: -                    return False -             -            # establish ourselves as the current sync  -            # this indicates to other read/write operations -            # that they should wait until this is None again -            self.current_sync_operation = _threading.currentThread() - -            # now wait again for asyncs to finish -            if self.async > 0: -                if wait: -                    # wait -                    self.condition.wait() -                else: -                    # we dont want to wait, so forget it -                    self.current_sync_operation = None -                    return False -        finally: -            self.condition.release() -         -        if not wait:  -            return True - -    def do_release_write_lock(self): -        self.condition.acquire() -        try: -            if self.current_sync_operation is not _threading.currentThread(): -                raise LockError("Synchronizer error - current thread doesnt " -                                "have the write lock") - -            # reset the current sync operation so  -            # another can get it -            self.current_sync_operation = None - -            # tell everyone to get ready -            self.condition.notifyAll() -        finally: -            # everyone go !! -            self.condition.release() | 
