diff options
Diffstat (limited to 'module')
| -rw-r--r-- | module/HookManager.py | 61 | ||||
| -rw-r--r-- | module/PluginThread.py | 10 | ||||
| -rw-r--r-- | module/PyFile.py | 3 | ||||
| -rw-r--r-- | module/Scheduler.py | 40 | ||||
| -rw-r--r-- | module/database/DatabaseBackend.py | 6 | ||||
| -rw-r--r-- | module/database/FileDatabase.py | 15 | ||||
| -rw-r--r-- | module/plugins/Hook.py | 35 | 
7 files changed, 123 insertions, 47 deletions
| diff --git a/module/HookManager.py b/module/HookManager.py index 1adc07117..be5f548d7 100644 --- a/module/HookManager.py +++ b/module/HookManager.py @@ -78,7 +78,7 @@ class HookManager:          #registering callback for config event          self.config.pluginCB = MethodType(self.dispatchEvent, "pluginConfigChanged", basestring) -        self.addEvent("pluginConfigChanged", self.activateHooks) +        self.addEvent("pluginConfigChanged", self.manageHooks)          self.lock = RLock()          self.createIndex() @@ -147,17 +147,19 @@ class HookManager:          self.plugins = plugins -    def activateHooks(self, plugin, name, value): +    def manageHooks(self, plugin, name, value): +        if name == "activated" and value: +            self.activateHook(plugin) +        elif name == "activated" and not value: +            self.deactivateHook(plugin) -        if name != "activated" or not value: -            return +    def activateHook(self, plugin):          #check if already loaded          for inst in self.plugins:              if inst.__name__ == plugin:                  return -          pluginClass = self.core.pluginManager.getHookPlugin(plugin)          if not pluginClass: return @@ -171,25 +173,23 @@ class HookManager:          # call core Ready          start_new_thread(plugin.coreReady, tuple()) -        # init periodical call -        self.core.scheduler.addJob(0, self.wrapPeriodical, args=[plugin], threaded=False) +    def deactivateHook(self, plugin): +        hook = None +        for inst in self.plugins: +            if inst.__name__ == plugin: +                hook = inst -    def wrapPeriodical(self, plugin): -        plugin.lastCall = time() -        try: -            if plugin.isActivated(): plugin.periodical() -        except Exception, e: -            self.core.log.error(_("Error executing hooks: %s") % str(e)) -            if self.core.debug: -                traceback.print_exc() +        if not hook: return -        self.core.scheduler.addJob(plugin.interval, self.wrapPeriodical, args=[plugin], threaded=False) +        self.log.debug("Plugin unloaded: %s" % plugin) -    def initPeriodical(self): -        for plugin in self.plugins: -            if plugin.isActivated() and plugin.interval >= 1: -                self.core.scheduler.addJob(0, self.wrapPeriodical, args=[plugin], threaded=False) +        hook.unload() + +        #remove periodic call +        self.log.debug("Removed callback %s" % self.core.scheduler.removeJob(hook.cb)) +        self.plugins.remove(hook) +        del self.pluginMap[hook.__name__]      @try_catch @@ -199,7 +199,14 @@ class HookManager:                  plugin.coreReady()          self.dispatchEvent("coreReady") -        self.initPeriodical() + +    @try_catch +    def coreExiting(self): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.coreExiting() + +        self.dispatchEvent("coreExiting")      @lock      def downloadStarts(self, pyfile): @@ -222,6 +229,18 @@ class HookManager:          self.dispatchEvent("downloadFinished", pyfile)      @lock +    @try_catch +    def downloadFailed(self, pyfile): +        for plugin in self.plugins: +            if plugin.isActivated(): +                if "downloadFailed" in plugin.__threaded__: +                    self.startThread(plugin.downloadFinished, pyfile) +                else: +                    plugin.downloadFailed(pyfile) + +        self.dispatchEvent("downloadFailed", pyfile) + +    @lock      def packageFinished(self, package):          for plugin in self.plugins:              if plugin.isActivated(): diff --git a/module/PluginThread.py b/module/PluginThread.py index be29a680e..c5b85a043 100644 --- a/module/PluginThread.py +++ b/module/PluginThread.py @@ -198,6 +198,7 @@ class DownloadThread(PluginThread):                      self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg})                      pyfile.error = msg +                self.m.core.hookManager.downloadFailed(pyfile)                  self.clean(pyfile)                  continue @@ -238,6 +239,8 @@ class DownloadThread(PluginThread):                          print_exc()                          self.writeDebugReport(pyfile) +                    self.m.core.hookManager.downloadFailed(pyfile) +                  self.clean(pyfile)                  continue @@ -266,12 +269,13 @@ class DownloadThread(PluginThread):                      print_exc()                      self.writeDebugReport(pyfile) +                self.m.core.hookManager.downloadFailed(pyfile)                  self.clean(pyfile)                  continue              finally:                  self.m.core.files.save() -                self.m.core.files.checkAllLinksProcessed() +                pyfile.checkIfProcessed()                  exc_clear()              self.m.log.info(_("Download finished: %s") % pyfile.name) @@ -285,12 +289,12 @@ class DownloadThread(PluginThread):              pyfile.finishIfDone()              self.m.core.files.save() -    #---------------------------------------------------------------------- +      def put(self, job):          """assing job to thread"""          self.queue.put(job) -    #---------------------------------------------------------------------- +      def stop(self):          """stops the thread"""          self.put("quit") diff --git a/module/PyFile.py b/module/PyFile.py index 07347fb93..de8ed1145 100644 --- a/module/PyFile.py +++ b/module/PyFile.py @@ -194,6 +194,9 @@ class PyFile(object):          self.release()          self.m.checkAllLinksFinished()          return True + +    def checkIfProcessed(self): +        self.m.checkAllLinksProcessed(self.id)      def formatWait(self):          """ formats and return wait time in humanreadable format """ diff --git a/module/Scheduler.py b/module/Scheduler.py index 5837dec9e..0bc396b69 100644 --- a/module/Scheduler.py +++ b/module/Scheduler.py @@ -39,23 +39,42 @@ class Deferred():              raise AlreadyCalled          self.result = (args, kwargs)          for f, cargs, ckwargs in self.call: -            args+=tuple(cargs) +            args += tuple(cargs)              kwargs.update(ckwargs) -            f(*args **kwargs) +            f(*args ** kwargs) +  class Scheduler():      def __init__(self, core):          self.core = core -         +          self.queue = PriorityQueue() -     +      def addJob(self, t, call, args=[], kwargs={}, threaded=True):          d = Deferred()          t += time()          j = Job(t, call, args, kwargs, d, threaded)          self.queue.put((t, j))          return d -     + + +    def removeJob(self, d): +        """ +        :param d: defered object +        :return: if job was deleted +        """ +        index = -1 + +        for i, j in enumerate(self.queue): +            if j[1].deferred == d: +                index = i + +        if index >= 0: +            del self.queue[index] +            return True + +        return False +      def work(self):          while True:              t, j = self.queue.get() @@ -68,6 +87,7 @@ class Scheduler():                      self.queue.put((t, j))                      break +  class Job():      def __init__(self, time, call, args=[], kwargs={}, deferred=None, threaded=True):          self.time = float(time) @@ -90,12 +110,20 @@ class Job():          else:              self.run() +  class PriorityQueue():      """ a non blocking priority queue """ +      def __init__(self):          self.queue = []          self.lock = Lock() +    def __iter__(self): +        return iter(self.queue) + +    def __delitem__(self, key): +        del self.queue[key] +      def put(self, element):          self.lock.acquire()          heappush(self.queue, element) @@ -108,6 +136,6 @@ class PriorityQueue():              el = heappop(self.queue)              return el          except IndexError: -            return None,None +            return None, None          finally:              self.lock.release()
\ No newline at end of file diff --git a/module/database/DatabaseBackend.py b/module/database/DatabaseBackend.py index d330262f9..9530390c3 100644 --- a/module/database/DatabaseBackend.py +++ b/module/database/DatabaseBackend.py @@ -76,8 +76,8 @@ class DatabaseJob():          self.result = None          self.exception = False -        #import inspect -        #self.frame = inspect.currentframe() +#        import inspect +#        self.frame = inspect.currentframe()      def __repr__(self):          from os.path import basename @@ -89,7 +89,7 @@ class DatabaseJob():          del frame          del self.frame -        return "DataBase Job %s:%s\n%s" % (self.f.__name__, self.args[1:], output) +        return "DataBase Job %s:%s\n%sResult: %s" % (self.f.__name__, self.args[1:], output, self.result)      def processJob(self):          try: diff --git a/module/database/FileDatabase.py b/module/database/FileDatabase.py index 19205dac6..2ca7fd07d 100644 --- a/module/database/FileDatabase.py +++ b/module/database/FileDatabase.py @@ -356,20 +356,19 @@ class FileHandler:          """checks if all files are finished and dispatch event"""          if not self.getQueueCount(True): -            #hope its not called together with all DownloadsProcessed -            self.core.hookManager.dispatchEvent("allDownloadsProcessed")              self.core.hookManager.dispatchEvent("allDownloadsFinished")              self.core.log.debug("All downloads finished")              return True          return False -    def checkAllLinksProcessed(self): -        """checks if all files was processed and pyload would idle now""" +    def checkAllLinksProcessed(self, fid): +        """checks if all files was processed and pyload would idle now, needs fid which will be ignored when counting""" +        # reset count so statistic will update (this is called when dl was processed)          self.resetCount() -         -        if not self.db.processcount(1): + +        if not self.db.processcount(1, fid):              self.core.hookManager.dispatchEvent("allDownloadsProcessed")              self.core.log.debug("All downloads processed")              return True @@ -564,9 +563,9 @@ class FileMethods():          return self.c.fetchone()[0]      @style.queue -    def processcount(self, queue): +    def processcount(self, queue, fid):          """ number of files which have to be proccessed """ -        self.c.execute("SELECT COUNT(*) FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? AND l.status IN (2,3,5,7,12)", (queue, )) +        self.c.execute("SELECT COUNT(*) FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? AND l.status IN (2,3,5,7,12) AND l.id != ?", (queue, str(fid)))          return self.c.fetchone()[0]      @style.inner diff --git a/module/plugins/Hook.py b/module/plugins/Hook.py index 3db3e47e9..7e4f58c66 100644 --- a/module/plugins/Hook.py +++ b/module/plugins/Hook.py @@ -19,7 +19,7 @@  """  from thread import start_new_thread - +from traceback import print_exc  class Expose(object):      """ used for decoration to declare rpc services """ @@ -71,6 +71,9 @@ class Hook():          #: Provide information in dict here, usable by API `getInfo`          self.info = None +        #: Callback of periodical job task, used by hookmanager +        self.cb = None +          #: `HookManager`          self.manager = manager @@ -92,13 +95,33 @@ class Hook():              self.event_list = None +        self.initPeriodical()          self.setup() +    def initPeriodical(self): +        if self.interval >=1: +            self.cb = self.core.scheduler.addJob(0, self._periodical, threaded=False) + +    def _periodical(self): +        try: +            if self.isActivated(): self.periodical() +        except Exception, e: +            self.core.log.error(_("Error executing hooks: %s") % str(e)) +            if self.core.debug: +                print_exc() + +        self.cb = self.core.scheduler.addJob(self.interval, self._periodical, threaded=False) + +      def __repr__(self):          return "<Hook %s>" % self.__name__      def setup(self): -        """ more init stuff if needed""" +        """ more init stuff if needed """ +        pass + +    def unload(self): +        """ called when hook was deactivated """          pass      def isActivated(self): @@ -126,6 +149,9 @@ class Hook():      #event methods - overwrite these if needed          def coreReady(self):          pass + +    def coreExiting(self): +        pass      def downloadStarts(self, pyfile):          pass @@ -138,10 +164,7 @@ class Hook():      def packageFinished(self, pypack):          pass -     -    def packageFailed(self, pypack): -        pass -     +      def beforeReconnecting(self, ip):          pass | 
