diff options
Diffstat (limited to 'module/lib/beaker/synchronization.py')
| -rw-r--r-- | module/lib/beaker/synchronization.py | 386 | 
1 files changed, 0 insertions, 386 deletions
diff --git a/module/lib/beaker/synchronization.py b/module/lib/beaker/synchronization.py deleted file mode 100644 index f236b8cfe..000000000 --- a/module/lib/beaker/synchronization.py +++ /dev/null @@ -1,386 +0,0 @@ -"""Synchronization functions. - -File- and mutex-based mutual exclusion synchronizers are provided, -as well as a name-based mutex which locks within an application -based on a string name. - -""" - -import os -import sys -import tempfile - -try: -    import threading as _threading -except ImportError: -    import dummy_threading as _threading - -# check for fcntl module -try: -    sys.getwindowsversion() -    has_flock = False -except: -    try: -        import fcntl -        has_flock = True -    except ImportError: -        has_flock = False - -from beaker import util -from beaker.exceptions import LockError - -__all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer", -            "NameLock", "_threading"] - - -class NameLock(object): -    """a proxy for an RLock object that is stored in a name based -    registry. - -    Multiple threads can get a reference to the same RLock based on the -    name alone, and synchronize operations related to that name. - -    """ -    locks = util.WeakValuedRegistry() - -    class NLContainer(object): -        def __init__(self, reentrant): -            if reentrant: -                self.lock = _threading.RLock() -            else: -                self.lock = _threading.Lock() - -        def __call__(self): -            return self.lock - -    def __init__(self, identifier=None, reentrant=False): -        if identifier is None: -            self._lock = NameLock.NLContainer(reentrant) -        else: -            self._lock = NameLock.locks.get(identifier, NameLock.NLContainer, -                                            reentrant) - -    def acquire(self, wait=True): -        return self._lock().acquire(wait) - -    def release(self): -        self._lock().release() - - -_synchronizers = util.WeakValuedRegistry() - - -def _synchronizer(identifier, cls, **kwargs): -    return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs) - - -def file_synchronizer(identifier, **kwargs): -    if not has_flock or 'lock_dir' not in kwargs: -        return mutex_synchronizer(identifier) -    else: -        return _synchronizer(identifier, FileSynchronizer, **kwargs) - - -def mutex_synchronizer(identifier, **kwargs): -    return _synchronizer(identifier, ConditionSynchronizer, **kwargs) - - -class null_synchronizer(object): -    """A 'null' synchronizer, which provides the :class:`.SynchronizerImpl` interface -    without any locking. - -    """ -    def acquire_write_lock(self, wait=True): -        return True - -    def acquire_read_lock(self): -        pass - -    def release_write_lock(self): -        pass - -    def release_read_lock(self): -        pass -    acquire = acquire_write_lock -    release = release_write_lock - - -class SynchronizerImpl(object): -    """Base class for a synchronization object that allows -    multiple readers, single writers. - -    """ -    def __init__(self): -        self._state = util.ThreadLocal() - -    class SyncState(object): -        __slots__ = 'reentrantcount', 'writing', 'reading' - -        def __init__(self): -            self.reentrantcount = 0 -            self.writing = False -            self.reading = False - -    def state(self): -        if not self._state.has(): -            state = SynchronizerImpl.SyncState() -            self._state.put(state) -            return state -        else: -            return self._state.get() -    state = property(state) - -    def release_read_lock(self): -        state = self.state - -        if state.writing: -            raise LockError("lock is in writing state") -        if not state.reading: -            raise LockError("lock is not in reading state") - -        if state.reentrantcount == 1: -            self.do_release_read_lock() -            state.reading = False - -        state.reentrantcount -= 1 - -    def acquire_read_lock(self, wait=True): -        state = self.state - -        if state.writing: -            raise LockError("lock is in writing state") - -        if state.reentrantcount == 0: -            x = self.do_acquire_read_lock(wait) -            if (wait or x): -                state.reentrantcount += 1 -                state.reading = True -            return x -        elif state.reading: -            state.reentrantcount += 1 -            return True - -    def release_write_lock(self): -        state = self.state - -        if state.reading: -            raise LockError("lock is in reading state") -        if not state.writing: -            raise LockError("lock is not in writing state") - -        if state.reentrantcount == 1: -            self.do_release_write_lock() -            state.writing = False - -        state.reentrantcount -= 1 - -    release = release_write_lock - -    def acquire_write_lock(self, wait=True): -        state = self.state - -        if state.reading: -            raise LockError("lock is in reading state") - -        if state.reentrantcount == 0: -            x = self.do_acquire_write_lock(wait) -            if (wait or x): -                state.reentrantcount += 1 -                state.writing = True -            return x -        elif state.writing: -            state.reentrantcount += 1 -            return True - -    acquire = acquire_write_lock - -    def do_release_read_lock(self): -        raise NotImplementedError() - -    def do_acquire_read_lock(self): -        raise NotImplementedError() - -    def do_release_write_lock(self): -        raise NotImplementedError() - -    def do_acquire_write_lock(self): -        raise NotImplementedError() - - -class FileSynchronizer(SynchronizerImpl): -    """A synchronizer which locks using flock(). - -    """ -    def __init__(self, identifier, lock_dir): -        super(FileSynchronizer, self).__init__() -        self._filedescriptor = util.ThreadLocal() - -        if lock_dir is None: -            lock_dir = tempfile.gettempdir() -        else: -            lock_dir = lock_dir - -        self.filename = util.encoded_path( -                            lock_dir, -                            [identifier], -                            extension='.lock' -                        ) - -    def _filedesc(self): -        return self._filedescriptor.get() -    _filedesc = property(_filedesc) - -    def _open(self, mode): -        filedescriptor = self._filedesc -        if filedescriptor is None: -            filedescriptor = os.open(self.filename, mode) -            self._filedescriptor.put(filedescriptor) -        return filedescriptor - -    def do_acquire_read_lock(self, wait): -        filedescriptor = self._open(os.O_CREAT | os.O_RDONLY) -        if not wait: -            try: -                fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB) -                return True -            except IOError: -                os.close(filedescriptor) -                self._filedescriptor.remove() -                return False -        else: -            fcntl.flock(filedescriptor, fcntl.LOCK_SH) -            return True - -    def do_acquire_write_lock(self, wait): -        filedescriptor = self._open(os.O_CREAT | os.O_WRONLY) -        if not wait: -            try: -                fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB) -                return True -            except IOError: -                os.close(filedescriptor) -                self._filedescriptor.remove() -                return False -        else: -            fcntl.flock(filedescriptor, fcntl.LOCK_EX) -            return True - -    def do_release_read_lock(self): -        self._release_all_locks() - -    def do_release_write_lock(self): -        self._release_all_locks() - -    def _release_all_locks(self): -        filedescriptor = self._filedesc -        if filedescriptor is not None: -            fcntl.flock(filedescriptor, fcntl.LOCK_UN) -            os.close(filedescriptor) -            self._filedescriptor.remove() - - -class ConditionSynchronizer(SynchronizerImpl): -    """a synchronizer using a Condition.""" - -    def __init__(self, identifier): -        super(ConditionSynchronizer, self).__init__() - -        # counts how many asynchronous methods are executing -        self.async = 0 - -        # pointer to thread that is the current sync operation -        self.current_sync_operation = None - -        # condition object to lock on -        self.condition = _threading.Condition(_threading.Lock()) - -    def do_acquire_read_lock(self, wait=True): -        self.condition.acquire() -        try: -            # see if a synchronous operation is waiting to start -            # or is already running, in which case we wait (or just -            # give up and return) -            if wait: -                while self.current_sync_operation is not None: -                    self.condition.wait() -            else: -                if self.current_sync_operation is not None: -                    return False - -            self.async += 1 -        finally: -            self.condition.release() - -        if not wait: -            return True - -    def do_release_read_lock(self): -        self.condition.acquire() -        try: -            self.async -= 1 - -            # check if we are the last asynchronous reader thread -            # out the door. -            if self.async == 0: -                # yes. so if a sync operation is waiting, notifyAll to wake -                # it up -                if self.current_sync_operation is not None: -                    self.condition.notifyAll() -            elif self.async < 0: -                raise LockError("Synchronizer error - too many " -                                "release_read_locks called") -        finally: -            self.condition.release() - -    def do_acquire_write_lock(self, wait=True): -        self.condition.acquire() -        try: -            # here, we are not a synchronous reader, and after returning, -            # assuming waiting or immediate availability, we will be. - -            if wait: -                # if another sync is working, wait -                while self.current_sync_operation is not None: -                    self.condition.wait() -            else: -                # if another sync is working, -                # we dont want to wait, so forget it -                if self.current_sync_operation is not None: -                    return False - -            # establish ourselves as the current sync -            # this indicates to other read/write operations -            # that they should wait until this is None again -            self.current_sync_operation = _threading.currentThread() - -            # now wait again for asyncs to finish -            if self.async > 0: -                if wait: -                    # wait -                    self.condition.wait() -                else: -                    # we dont want to wait, so forget it -                    self.current_sync_operation = None -                    return False -        finally: -            self.condition.release() - -        if not wait: -            return True - -    def do_release_write_lock(self): -        self.condition.acquire() -        try: -            if self.current_sync_operation is not _threading.currentThread(): -                raise LockError("Synchronizer error - current thread doesnt " -                                "have the write lock") - -            # reset the current sync operation so -            # another can get it -            self.current_sync_operation = None - -            # tell everyone to get ready -            self.condition.notifyAll() -        finally: -            # everyone go !! -            self.condition.release()  | 
