diff options
| author | 2014-01-18 18:45:13 +0100 | |
|---|---|---|
| committer | 2014-01-18 18:45:13 +0100 | |
| commit | 453c1e55c71a96c9529ecdca1d55278cc41088d6 (patch) | |
| tree | 7a516a84e5590ce5f1f3def71c24bcb14f209023 /pyload/threads | |
| parent | small fixes and improvements for download engine (diff) | |
| download | pyload-453c1e55c71a96c9529ecdca1d55278cc41088d6.tar.xz | |
rewritten download scheduling, improved account manager, db version increased all data will be overwritten
Diffstat (limited to 'pyload/threads')
| -rw-r--r-- | pyload/threads/DecrypterThread.py | 1 | ||||
| -rw-r--r-- | pyload/threads/DownloadThread.py | 19 | ||||
| -rw-r--r-- | pyload/threads/ThreadManager.py | 220 | 
3 files changed, 24 insertions, 216 deletions
diff --git a/pyload/threads/DecrypterThread.py b/pyload/threads/DecrypterThread.py index 9f796da22..419f153a2 100644 --- a/pyload/threads/DecrypterThread.py +++ b/pyload/threads/DecrypterThread.py @@ -23,7 +23,6 @@ class DecrypterThread(BaseThread):          # holds the progress, while running          self.progress = None -        self.m.addThread(self)          self.start()      def getProgress(self): diff --git a/pyload/threads/DownloadThread.py b/pyload/threads/DownloadThread.py index d1672531b..b8f7e4965 100644 --- a/pyload/threads/DownloadThread.py +++ b/pyload/threads/DownloadThread.py @@ -18,6 +18,7 @@      @author: RaNaN  """ +from threading import Event  from Queue import Queue  from time import sleep, time  from traceback import print_exc @@ -37,6 +38,9 @@ class DownloadThread(BaseThread):          """Constructor"""          BaseThread.__init__(self, manager) +        self.isWorking = Event() +        self.isWorking.clear() +          self.queue = Queue() # job queue          self.active = None @@ -53,12 +57,19 @@ class DownloadThread(BaseThread):              if self.active == "quit":                  self.active = None -                self.m.threads.remove(self) +                self.m.stop(self)                  return True              try: -                if not pyfile.hasPlugin(): continue +                pyfile.initPlugin() + +                # after initialization the thread is fully ready +                self.isWorking.set() +                  #this pyfile was deleted while queuing +                # TODO: what will happen with new thread manager? +                #if not pyfile.hasPlugin(): continue +                  pyfile.plugin.checkForSameFiles(starting=True)                  self.log.info(_("Download starts: %s" % pyfile.name)) @@ -204,7 +215,9 @@ class DownloadThread(BaseThread):                  self.core.files.save()                  pyfile.checkIfProcessed()                  exc_clear() - +                # manager could still be waiting for it +                self.isWorking.set() +                self.m.done(self)              #pyfile.plugin.req.clean() diff --git a/pyload/threads/ThreadManager.py b/pyload/threads/ThreadManager.py index 298b0402d..f6cb3daea 100644 --- a/pyload/threads/ThreadManager.py +++ b/pyload/threads/ThreadManager.py @@ -2,7 +2,7 @@  # -*- coding: utf-8 -*-  ############################################################################### -#   Copyright(c) 2008-2013 pyLoad Team +#   Copyright(c) 2008-2014 pyLoad Team  #   http://www.pyload.org  #  #   This file is part of pyLoad. @@ -16,22 +16,11 @@  #   @author: RaNaN  ############################################################################### -from os.path import exists, join -import re -from subprocess import Popen -from threading import Event, RLock -from time import sleep, time -from traceback import print_exc -from random import choice +from threading import  RLock +from time import time -from pyload.datatypes.PyFile import PyFile  from pyload.datatypes.OnlineCheck import OnlineCheck -from pyload.network.RequestFactory import getURL -from pyload.utils import lock, uniqify, to_list -from pyload.utils.fs import free_space - -from DecrypterThread import DecrypterThread -from DownloadThread import DownloadThread +from pyload.utils import lock, to_list  from InfoThread import InfoThread @@ -44,13 +33,6 @@ class ThreadManager:          self.log = core.log          self.threads = []  # thread list -        self.localThreads = []  #addon+decrypter threads - -        self.pause = True - -        self.reconnecting = Event() -        self.reconnecting.clear() -        self.downloaded = 0 #number of files downloaded since last cleanup          self.lock = RLock() @@ -67,24 +49,15 @@ class ThreadManager:          # timeout for cache purge          self.timestamp = 0 -        for i in range(self.core.config.get("download", "max_downloads")): -            self.createThread() - -    def createThread(self): -        """create a download thread""" - -        thread = DownloadThread(self) -        self.threads.append(thread) -      @lock      def addThread(self, thread): -        self.localThreads.append(thread) +        self.threads.append(thread)      @lock      def removeThread(self, thread):          """ Remove a thread from the local list """ -        if thread in self.localThreads: -            self.localThreads.remove(thread) +        if thread in self.threads: +            self.threads.remove(thread)      @lock      def createInfoThread(self, data, pid): @@ -108,11 +81,6 @@ class ThreadManager:          return rid      @lock -    def createDecryptThread(self, data, pid): -        """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" -        if data: DecrypterThread(self, data, pid) - -    @lock      def getInfoResult(self, rid):          return self.infoResults.get(rid) @@ -120,14 +88,10 @@ class ThreadManager:          self.core.evm.dispatchEvent("linkcheck:updated", oc.rid, result, owner=oc.owner)          oc.update(result) -    def getActiveDownloads(self, user=None): -        # TODO: user context -        return [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] -      def getProgressList(self, user=None):          info = [] -        for thread in self.threads + self.localThreads: +        for thread in self.threads:              # skip if not belong to current user              if user is not None and thread.owner != user: continue @@ -136,38 +100,8 @@ class ThreadManager:          return info -    def getActiveFiles(self): -        active = self.getActiveDownloads() - -        for t in self.localThreads: -            active.extend(t.getActiveFiles()) - -        return active - -    def processingIds(self): -        """get a id list of all pyfiles processed""" -        return [x.id for x in self.getActiveFiles()] -      def work(self):          """run all task which have to be done (this is for repetitive call by core)""" -        try: -            self.tryReconnect() -        except Exception, e: -            self.log.error(_("Reconnect Failed: %s") % str(e)) -            self.reconnecting.clear() -            self.core.print_exc() - -        self.checkThreadCount() - -        try: -            self.assignJob() -        except Exception, e: -            self.log.warning("Assign job error", e) -            self.core.print_exc() - -            sleep(0.5) -            self.assignJob() -            #it may be failed non critical so we try it again          if self.infoCache and self.timestamp < time():              self.infoCache.clear() @@ -176,141 +110,3 @@ class ThreadManager:          for rid in self.infoResults.keys():              if self.infoResults[rid].isStale():                  del self.infoResults[rid] - -    def tryReconnect(self): -        """checks if reconnect needed""" - -        if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()): -            return False - -        active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] - -        if not (0 < active.count(True) == len(active)): -            return False - -        if not exists(self.core.config['reconnect']['method']): -            if exists(join(pypath, self.core.config['reconnect']['method'])): -                self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) -            else: -                self.core.config["reconnect"]["activated"] = False -                self.log.warning(_("Reconnect script not found!")) -                return - -        self.reconnecting.set() - -        #Do reconnect -        self.log.info(_("Starting reconnect")) - -        while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: -            sleep(0.25) - -        ip = self.getIP() - -        self.core.evm.dispatchEvent("reconnect:before", ip) - -        self.log.debug("Old IP: %s" % ip) - -        try: -            reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE) -        except: -            self.log.warning(_("Failed executing reconnect script!")) -            self.core.config["reconnect"]["activated"] = False -            self.reconnecting.clear() -            self.core.print_exc() -            return - -        reconn.wait() -        sleep(1) -        ip = self.getIP() -        self.core.evm.dispatchEvent("reconnect:after", ip) - -        self.log.info(_("Reconnected, new IP: %s") % ip) - -        self.reconnecting.clear() - -    def getIP(self): -        """retrieve current ip""" -        services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), -                    ("http://checkip.dyndns.org/", ".*Current IP Address: (\S+)</body>.*")] - -        ip = "" -        for i in range(10): -            try: -                sv = choice(services) -                ip = getURL(sv[0]) -                ip = re.match(sv[1], ip).group(1) -                break -            except: -                ip = "" -                sleep(1) - -        return ip - -    def checkThreadCount(self): -        """checks if there is a need for increasing or reducing thread count""" - -        if len(self.threads) == self.core.config.get("download", "max_downloads"): -            return True -        elif len(self.threads) < self.core.config.get("download", "max_downloads"): -            self.createThread() -        else: -            free = [x for x in self.threads if not x.active] -            if free: -                free[0].put("quit") - - -    def cleanPycurl(self): -        """ make a global curl cleanup (currently unused) """ -        if self.processingIds(): -            return False -        import pycurl - -        pycurl.global_cleanup() -        pycurl.global_init(pycurl.GLOBAL_DEFAULT) -        self.downloaded = 0 -        self.log.debug("Cleaned up pycurl") -        return True - - -    def assignJob(self): -        """assign a job to a thread if possible""" - -        if self.pause or not self.core.api.isTimeDownload(): return - -        #if self.downloaded > 20: -        #    if not self.cleanPyCurl(): return - -        free = [x for x in self.threads if not x.active] - -        inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if -                 x.active and x.active.hasPlugin()] -        inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in -                 inuse] -        occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]]))) - -        job = self.core.files.getJob(occ) -        if job: -            try: -                job.initPlugin() -            except Exception, e: -                self.log.critical(str(e)) -                print_exc() -                job.setStatus("failed") -                job.error = str(e) -                job.release() -                return - -            spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 -            if spaceLeft < self.core.config["general"]["min_free_space"]: -                self.log.warning(_("Not enough space left on device")) -                self.pause = True - -            if free and not self.pause: -                thread = free[0] -                #self.downloaded += 1 -                thread.put(job) -            else: -                #put job back -                if occ not in self.core.files.jobCache: -                    self.core.files.jobCache[occ] = [] -                self.core.files.jobCache[occ].append(job.id)  | 
