diff options
| author | 2013-06-09 18:10:22 +0200 | |
|---|---|---|
| committer | 2013-06-09 18:10:23 +0200 | |
| commit | 16af85004c84d0d6c626b4f8424ce9647669a0c1 (patch) | |
| tree | 025d479862d376dbc17e934f4ed20031c8cd97d1 /module/threads | |
| parent | adapted to jshint config (diff) | |
| download | pyload-16af85004c84d0d6c626b4f8424ce9647669a0c1.tar.xz | |
moved everything from module to pyload
Diffstat (limited to 'module/threads')
| -rw-r--r-- | module/threads/AddonThread.py | 65 | ||||
| -rw-r--r-- | module/threads/BaseThread.py | 142 | ||||
| -rw-r--r-- | module/threads/DecrypterThread.py | 81 | ||||
| -rw-r--r-- | module/threads/DownloadThread.py | 231 | ||||
| -rw-r--r-- | module/threads/InfoThread.py | 168 | ||||
| -rw-r--r-- | module/threads/ThreadManager.py | 313 | ||||
| -rw-r--r-- | module/threads/__init__.py | 0 | 
7 files changed, 0 insertions, 1000 deletions
| diff --git a/module/threads/AddonThread.py b/module/threads/AddonThread.py deleted file mode 100644 index afb56f66b..000000000 --- a/module/threads/AddonThread.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from copy import copy -from traceback import print_exc - -from BaseThread import BaseThread - -class AddonThread(BaseThread): -    """thread for addons""" - -    def __init__(self, m, function, args, kwargs): -        """Constructor""" -        BaseThread.__init__(self, m) - -        self.f = function -        self.args = args -        self.kwargs = kwargs - -        self.active = [] - -        m.localThreads.append(self) - -        self.start() - -    def getActiveFiles(self): -        return self.active - -    def addActive(self, pyfile): -        """ Adds a pyfile to active list and thus will be displayed on overview""" -        if pyfile not in self.active: -            self.active.append(pyfile) - -    def finishFile(self, pyfile): -        if pyfile in self.active: -            self.active.remove(pyfile) - -        pyfile.finishIfDone() - -    def run(self): #TODO: approach via func_code -        try: -            try: -                self.kwargs["thread"] = self -                self.f(*self.args, **self.kwargs) -            except TypeError, e: -                #dirty method to filter out exceptions -                if "unexpected keyword argument 'thread'" not in e.args[0]: -                    raise - -                del self.kwargs["thread"] -                self.f(*self.args, **self.kwargs) -        except Exception, e: -            if hasattr(self.f, "im_self"): -                addon = self.f.im_self -                addon.logError(_("An Error occurred"), e) -                if self.m.core.debug: -                    print_exc() -                    self.writeDebugReport(addon.__name__, plugin=addon) - -        finally: -            local = copy(self.active) -            for x in local: -                self.finishFile(x) - -            self.m.localThreads.remove(self)
\ No newline at end of file diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py deleted file mode 100644 index c64678a72..000000000 --- a/module/threads/BaseThread.py +++ /dev/null @@ -1,142 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from threading import Thread -from time import strftime, gmtime -from sys import exc_info -from types import MethodType -from pprint import pformat -from traceback import format_exc - -from module.utils import primary_uid -from module.utils.fs import listdir, join, save_join, stat, exists - -class BaseThread(Thread): -    """abstract base class for thread types""" - -    def __init__(self, manager): -        Thread.__init__(self) -        self.setDaemon(True) -        self.m = manager #thread manager -        self.core = manager.core -        self.log = manager.core.log - -        #: Owner of the thread, every type should set it -        self.owner = None - -    @property -    def user(self): -        return primary_uid(self.owner) - -    def getProgress(self): -        """ retrieves progress information about the current running task - -        :return: :class:`ProgressInfo` -        """ - -    # Debug Stuff -    def writeDebugReport(self, name, pyfile=None, plugin=None): -        """ writes a debug report to disk  """ - -        dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S")) -        if pyfile: -            dump = self.getFileDump(pyfile) -        else: -            dump = self.getPluginDump(plugin) - -        try: -            import zipfile - -            zip = zipfile.ZipFile(dump_name, "w") - -            if exists(join("tmp", name)): -                for f in listdir(join("tmp", name)): -                    try: -                        # avoid encoding errors -                        zip.write(join("tmp", name, f), save_join(name, f)) -                    except: -                        pass - -            info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime()) -            info.external_attr = 0644 << 16L # change permissions -            zip.writestr(info, dump) - -            info = zipfile.ZipInfo(save_join(name, "system_Report.txt"), gmtime()) -            info.external_attr = 0644 << 16L -            zip.writestr(info, self.getSystemDump()) - -            zip.close() - -            if not stat(dump_name).st_size: -                raise Exception("Empty Zipfile") - -        except Exception, e: -            self.log.debug("Error creating zip file: %s" % e) - -            dump_name = dump_name.replace(".zip", ".txt") -            f = open(dump_name, "wb") -            f.write(dump) -            f.close() - -        self.log.info("Debug Report written to %s" % dump_name) -        return dump_name - -    def getFileDump(self, pyfile): -        dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( -            self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc()) - -        tb = exc_info()[2] -        stack = [] -        while tb: -            stack.append(tb.tb_frame) -            tb = tb.tb_next - -        for frame in stack[1:]: -            dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name, -                                                       frame.f_code.co_filename, -                                                       frame.f_lineno) - -            for key, value in frame.f_locals.items(): -                dump += "\t%20s = " % key -                try: -                    dump += pformat(value) + "\n" -                except Exception, e: -                    dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - -            del frame - -        del stack #delete it just to be sure... - -        dump += "\n\nPLUGIN OBJECT DUMP: \n\n" - -        for name in dir(pyfile.plugin): -            attr = getattr(pyfile.plugin, name) -            if not name.endswith("__") and type(attr) != MethodType: -                dump += "\t%20s = " % name -                try: -                    dump += pformat(attr) + "\n" -                except Exception, e: -                    dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - -        dump += "\nPYFILE OBJECT DUMP: \n\n" - -        for name in dir(pyfile): -            attr = getattr(pyfile, name) -            if not name.endswith("__") and type(attr) != MethodType: -                dump += "\t%20s = " % name -                try: -                    dump += pformat(attr) + "\n" -                except Exception, e: -                    dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - -        dump += "\n\nCONFIG: \n\n" -        dump += pformat(self.m.core.config.values) + "\n" - -        return dump - -        #TODO -    def getPluginDump(self, plugin): -        return "" - -    def getSystemDump(self): -        return "" diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py deleted file mode 100644 index 39448a620..000000000 --- a/module/threads/DecrypterThread.py +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from time import sleep -from traceback import print_exc - -from module.utils import uniqify -from module.plugins.Base import Retry -from module.plugins.Crypter import Package - -from BaseThread import BaseThread - -class DecrypterThread(BaseThread): -    """thread for decrypting""" - -    def __init__(self, manager, data, pid): -        """constructor""" -        BaseThread.__init__(self, manager) -        self.data = data -        self.pid = pid - -        self.start() - -    def run(self): -        plugin_map = {} -        for url, plugin in self.data: -            if plugin in plugin_map: -                plugin_map[plugin].append(url) -            else: -                plugin_map[plugin] = [url] - -        self.decrypt(plugin_map) - -    def decrypt(self, plugin_map): -        pack = self.m.core.files.getPackage(self.pid) -        result = [] - -        for name, urls in plugin_map.iteritems(): -            klass = self.m.core.pluginManager.loadClass("crypter", name) -            plugin = klass(self.m.core, pack, pack.password) -            plugin_result = [] - -            try: -                try: -                    plugin_result = plugin._decrypt(urls) -                except Retry: -                    sleep(1) -                    plugin_result = plugin._decrypt(urls) -            except Exception, e: -                plugin.logError(_("Decrypting failed"), e) -                if self.m.core.debug: -                    print_exc() -                    self.writeDebugReport(plugin.__name__, plugin=plugin) - -            plugin.logDebug("Decrypted", plugin_result) -            result.extend(plugin_result) - -        #TODO -        result = uniqify(result) -        pack_names = {} -        urls = [] - -        for p in result: -            if isinstance(p, Package): -                if p.name in pack_names: -                    pack_names[p.name].urls.extend(p.urls) -                else: -                    pack_names[p.name] = p -            else: -                urls.append(p) - -        if urls: -            self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name}) -            self.m.core.api.addFiles(self.pid, urls) - -        for p in pack_names.itervalues(): -            self.m.core.api.addPackage(p.name, p.urls, pack.password) - -        if not result: -            self.log.info(_("No links decrypted")) - diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py deleted file mode 100644 index cf59c5639..000000000 --- a/module/threads/DownloadThread.py +++ /dev/null @@ -1,231 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -    This program is free software; you can redistribute it and/or modify -    it under the terms of the GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, -    or (at your option) any later version. - -    This program is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -    See the GNU General Public License for more details. - -    You should have received a copy of the GNU General Public License -    along with this program; if not, see <http://www.gnu.org/licenses/>. - -    @author: RaNaN -""" - -from Queue import Queue -from time import sleep, time -from traceback import print_exc -from sys import exc_clear -from pycurl import error - -from module.plugins.Base import Fail, Retry, Abort -from module.plugins.Hoster import Reconnect, SkipDownload -from module.network.HTTPRequest import BadHeader - -from BaseThread import BaseThread - -class DownloadThread(BaseThread): -    """thread for downloading files from 'real' hoster plugins""" - -    def __init__(self, manager): -        """Constructor""" -        BaseThread.__init__(self, manager) - -        self.queue = Queue() # job queue -        self.active = None - -        self.start() - -    def run(self): -        """run method""" -        pyfile = None - -        while True: -            del pyfile -            self.active = self.queue.get() -            pyfile = self.active - -            if self.active == "quit": -                self.active = None -                self.m.threads.remove(self) -                return True - -            try: -                if not pyfile.hasPlugin(): continue -                #this pyfile was deleted while queuing - -                pyfile.plugin.checkForSameFiles(starting=True) -                self.log.info(_("Download starts: %s" % pyfile.name)) - -                # start download -                self.core.addonManager.downloadPreparing(pyfile) -                pyfile.plugin.preprocessing(self) - -                self.log.info(_("Download finished: %s") % pyfile.name) -                self.core.addonManager.downloadFinished(pyfile) -                self.core.files.checkPackageFinished(pyfile) - -            except NotImplementedError: -                self.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) -                pyfile.setStatus("failed") -                pyfile.error = "Plugin does not work" -                self.clean(pyfile) -                continue - -            except Abort: -                try: -                    self.log.info(_("Download aborted: %s") % pyfile.name) -                except: -                    pass - -                pyfile.setStatus("aborted") - -                self.clean(pyfile) -                continue - -            except Reconnect: -                self.queue.put(pyfile) -                #pyfile.req.clearCookies() - -                while self.m.reconnecting.isSet(): -                    sleep(0.5) - -                continue - -            except Retry, e: -                reason = e.args[0] -                self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) -                self.queue.put(pyfile) -                continue -            except Fail, e: -                msg = e.args[0] - -                # TODO: activate former skipped downloads - -                if msg == "offline": -                    pyfile.setStatus("offline") -                    self.log.warning(_("Download is offline: %s") % pyfile.name) -                elif msg == "temp. offline": -                    pyfile.setStatus("temp. offline") -                    self.log.warning(_("Download is temporary offline: %s") % pyfile.name) -                else: -                    pyfile.setStatus("failed") -                    self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) -                    pyfile.error = msg - -                self.core.addonManager.downloadFailed(pyfile) -                self.clean(pyfile) -                continue - -            except error, e: -                if len(e.args) == 2: -                    code, msg = e.args -                else: -                    code = 0 -                    msg = e.args - -                self.log.debug("pycurl exception %s: %s" % (code, msg)) - -                if code in (7, 18, 28, 52, 56): -                    self.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) -                    wait = time() + 60 - -                    pyfile.waitUntil = wait -                    pyfile.setStatus("waiting") -                    while time() < wait: -                        sleep(1) -                        if pyfile.abort: -                            break - -                    if pyfile.abort: -                        self.log.info(_("Download aborted: %s") % pyfile.name) -                        pyfile.setStatus("aborted") - -                        self.clean(pyfile) -                    else: -                        self.queue.put(pyfile) - -                    continue - -                else: -                    pyfile.setStatus("failed") -                    self.log.error("pycurl error %s: %s" % (code, msg)) -                    if self.core.debug: -                        print_exc() -                        self.writeDebugReport(pyfile.plugin.__name__, pyfile) - -                    self.core.addonManager.downloadFailed(pyfile) - -                self.clean(pyfile) -                continue - -            except SkipDownload, e: -                pyfile.setStatus("skipped") - -                self.log.info(_("Download skipped: %(name)s due to %(plugin)s") -                % {"name": pyfile.name, "plugin": e.message}) - -                self.clean(pyfile) - -                self.core.files.checkPackageFinished(pyfile) - -                self.active = False -                self.core.files.save() - -                continue - - -            except Exception, e: -                if isinstance(e, BadHeader) and e.code == 500: -                    pyfile.setStatus("temp. offline") -                    self.log.warning(_("Download is temporary offline: %s") % pyfile.name) -                    pyfile.error = _("Internal Server Error") - -                else: -                    pyfile.setStatus("failed") -                    self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) -                    pyfile.error = str(e) - -                if self.core.debug: -                    print_exc() -                    self.writeDebugReport(pyfile.plugin.__name__, pyfile) - -                self.core.addonManager.downloadFailed(pyfile) -                self.clean(pyfile) -                continue - -            finally: -                self.core.files.save() -                pyfile.checkIfProcessed() -                exc_clear() - -             -            #pyfile.plugin.req.clean() - -            self.active = False -            pyfile.finishIfDone() -            self.core.files.save() - -    def getProgress(self): -        if self.active: -            return self.active.getProgressInfo() - - -    def put(self, job): -        """assign a job to the thread""" -        self.queue.put(job) - -    def clean(self, pyfile): -        """ set thread inactive and release pyfile """ -        self.active = False -        pyfile.release() - -    def stop(self): -        """stops the thread""" -        self.put("quit") diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py deleted file mode 100644 index bf5bb5777..000000000 --- a/module/threads/InfoThread.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from time import time -from traceback import print_exc - -from module.Api import LinkStatus -from module.utils.packagetools import parseNames -from module.utils import has_method, accumulate - -from BaseThread import BaseThread - -class InfoThread(BaseThread): -    def __init__(self, manager, data, pid=-1, rid=-1): -        """Constructor""" -        BaseThread.__init__(self, manager) - -        self.data = data -        self.pid = pid # package id -        # [ .. (name, plugin) .. ] - -        self.rid = rid #result id - -        self.cache = [] #accumulated data - -        self.start() - -    def run(self): -        """run method""" - -        plugins = accumulate(self.data) -        crypter = {} - -        # filter out crypter plugins -        for name in self.m.core.pluginManager.getPlugins("crypter"): -            if name in plugins: -                crypter[name] = plugins[name] -                del plugins[name] - -        #directly write to database -        if self.pid > -1: -            for pluginname, urls in plugins.iteritems(): -                plugin = self.m.core.pluginManager.getPluginModule(pluginname) -                klass = self.m.core.pluginManager.getPluginClass(pluginname) -                if has_method(klass, "getInfo"): -                    self.fetchForPlugin(pluginname, klass, urls, self.updateDB) -                    self.m.core.files.save() -                elif has_method(plugin, "getInfo"): -                    self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead") -                    self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) -                    self.m.core.files.save() - -        else: #post the results -            for name, urls in crypter: -                #attach container content -                try: -                    data = self.decrypt(name, urls) -                except: -                    print_exc() -                    self.m.log.error("Could not decrypt container.") -                    data = [] - -                accumulate(data, plugins) - -            self.m.infoResults[self.rid] = {} - -            for pluginname, urls in plugins.iteritems(): -                plugin = self.m.core.pluginManager.getPlugin(pluginname, True) -                klass = getattr(plugin, pluginname) -                if has_method(klass, "getInfo"): -                    self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) -                    #force to process cache -                    if self.cache: -                        self.updateResult(pluginname, [], True) -                elif has_method(plugin, "getInfo"): -                    self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead") -                    self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) -                    #force to process cache -                    if self.cache: -                        self.updateResult(pluginname, [], True) -                else: -                    #generate default result -                    result = [(url, 0, 3, url) for url in urls] - -                    self.updateResult(pluginname, result, True) - -            self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {} - -        self.m.timestamp = time() + 5 * 60 - - -    def updateDB(self, plugin, result): -        self.m.core.files.updateFileInfo(result, self.pid) - -    def updateResult(self, plugin, result, force=False): -        #parse package name and generate result -        #accumulate results - -        self.cache.extend(result) - -        if len(self.cache) >= 20 or force: -            #used for package generating -            tmp = [(name, (url, LinkStatus(name, plugin, "unknown", status, int(size)))) -            for name, size, status, url in self.cache] - -            data = parseNames(tmp) -            result = {} -            for k, v in data.iteritems(): -                for url, status in v: -                    status.packagename = k -                    result[url] = status - -            self.m.setInfoResults(self.rid, result) - -            self.cache = [] - -    def updateCache(self, plugin, result): -        self.cache.extend(result) - -    def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None): -        try: -            result = [] #result loaded from cache -            process = [] #urls to process -            for url in urls: -                if url in self.m.infoCache: -                    result.append(self.m.infoCache[url]) -                else: -                    process.append(url) - -            if result: -                self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname)) -                cb(pluginname, result) - -            if process: -                self.m.log.debug("Run Info Fetching for %s" % pluginname) -                for result in plugin.getInfo(process): -                    #result = [ .. (name, size, status, url) .. ] -                    if not type(result) == list: result = [result] - -                    for res in result: -                        self.m.infoCache[res[3]] = res - -                    cb(pluginname, result) - -            self.m.log.debug("Finished Info Fetching for %s" % pluginname) -        except Exception, e: -            self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % -                               {"name": pluginname, "err": str(e)}) -            if self.m.core.debug: -                print_exc() - -            # generate default results -            if err: -                result = [(url, 0, 3, url) for url in urls] -                cb(pluginname, result) - - -    def decrypt(self, plugin, urls): -        self.m.log.debug("Pre decrypting %s" % plugin) -        klass = self.m.core.pluginManager.loadClass("crypter", plugin) - -        # only decrypt files -        if has_method(klass, "decryptFile"): -            urls = klass.decrypt(urls) -            data, crypter = self.m.core.pluginManager.parseUrls(urls) -            return data - -        return [] diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py deleted file mode 100644 index f67179d08..000000000 --- a/module/threads/ThreadManager.py +++ /dev/null @@ -1,313 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -############################################################################### -#   Copyright(c) 2008-2012 pyLoad Team -#   http://www.pyload.org -# -#   This file is part of pyLoad. -#   pyLoad is free software: you can redistribute it and/or modify -#   it under the terms of the GNU Affero General Public License as -#   published by the Free Software Foundation, either version 3 of the -#   License, or (at your option) any later version. -# -#   Subjected to the terms and conditions in LICENSE -# -#   @author: RaNaN -############################################################################### - -from os.path import exists, join -import re -from subprocess import Popen -from threading import Event, Lock -from time import sleep, time -from traceback import print_exc -from random import choice - -import pycurl - -from module.datatypes.PyFile import PyFile -from module.network.RequestFactory import getURL -from module.utils import lock, uniqify -from module.utils.fs import free_space - -from DecrypterThread import DecrypterThread -from DownloadThread import DownloadThread -from InfoThread import InfoThread - -class ThreadManager: -    """manages the download threads, assign jobs, reconnect etc""" - - -    def __init__(self, core): -        """Constructor""" -        self.core = core -        self.log = core.log - -        self.threads = []  # thread list -        self.localThreads = []  #addon+decrypter threads - -        self.pause = True - -        self.reconnecting = Event() -        self.reconnecting.clear() -        self.downloaded = 0 #number of files downloaded since last cleanup - -        self.lock = Lock() - -        # some operations require to fetch url info from hoster, so we caching them so it wont be done twice -        # contains a timestamp and will be purged after timeout -        self.infoCache = {} - -        # pool of ids for online check -        self.resultIDs = 0 - -        # threads which are fetching hoster results -        self.infoResults = {} -        # timeout for cache purge -        self.timestamp = 0 - -        pycurl.global_init(pycurl.GLOBAL_DEFAULT) - -        for i in range(self.core.config.get("download", "max_downloads")): -            self.createThread() - - -    def createThread(self): -        """create a download thread""" - -        thread = DownloadThread(self) -        self.threads.append(thread) - -    def createInfoThread(self, data, pid): -        """ start a thread which fetches online status and other info's """ -        self.timestamp = time() + 5 * 60 -        if data: InfoThread(self, data, pid) - -    @lock -    def createResultThread(self, data): -        """ creates a thread to fetch online status, returns result id """ -        self.timestamp = time() + 5 * 60 - -        rid = self.resultIDs -        self.resultIDs += 1 - -        InfoThread(self, data, rid=rid) - -        return rid - -    @lock -    def createDecryptThread(self, data, pid): -        """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" -        if data: DecrypterThread(self, data, pid) - - -    @lock -    def getInfoResult(self, rid): -        """returns result and clears it""" -        self.timestamp = time() + 5 * 60 - -        if rid in self.infoResults: -            data = self.infoResults[rid] -            self.infoResults[rid] = {} -            return data -        else: -            return {} - -    @lock -    def setInfoResults(self, rid, result): -        self.infoResults[rid].update(result) - -    def getActiveDownloads(self, user=None): -        # TODO: user context -        return [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] - -    def getProgressList(self, user=None): -        info = [] - -        # TODO: local threads can create multiple progresses -        for thread in self.threads + self.localThreads: -            # skip if not belong to current user -            if user and thread.user != user: continue - -            progress = thread.getProgress() -            if progress: info.append(progress) - -        return info - -    def getActiveFiles(self): -        active = self.getActiveDownloads() - -        for t in self.localThreads: -            active.extend(t.getActiveFiles()) - -        return active - -    def processingIds(self): -        """get a id list of all pyfiles processed""" -        return [x.id for x in self.getActiveFiles()] - -    def work(self): -        """run all task which have to be done (this is for repetetive call by core)""" -        try: -            self.tryReconnect() -        except Exception, e: -            self.log.error(_("Reconnect Failed: %s") % str(e) ) -            self.reconnecting.clear() -            self.core.print_exc() - -        self.checkThreadCount() - -        try: -            self.assignJob() -        except Exception, e: -            self.log.warning("Assign job error", e) -            self.core.print_exc() -             -            sleep(0.5) -            self.assignJob() -            #it may be failed non critical so we try it again - -        if (self.infoCache or self.infoResults) and self.timestamp < time(): -            self.infoCache.clear() -            self.infoResults.clear() -            self.log.debug("Cleared Result cache") - -    def tryReconnect(self): -        """checks if reconnect needed""" - -        if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()): -            return False - -        active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] - -        if not (0 < active.count(True) == len(active)): -            return False - -        if not exists(self.core.config['reconnect']['method']): -            if exists(join(pypath, self.core.config['reconnect']['method'])): -                self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) -            else: -                self.core.config["reconnect"]["activated"] = False -                self.log.warning(_("Reconnect script not found!")) -                return - -        self.reconnecting.set() - -        #Do reconnect -        self.log.info(_("Starting reconnect")) - -        while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: -            sleep(0.25) - -        ip = self.getIP() - -        self.core.addonManager.beforeReconnecting(ip) - -        self.log.debug("Old IP: %s" % ip) - -        try: -            reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE) -        except: -            self.log.warning(_("Failed executing reconnect script!")) -            self.core.config["reconnect"]["activated"] = False -            self.reconnecting.clear() -            if self.core.debug: -                print_exc() -            return - -        reconn.wait() -        sleep(1) -        ip = self.getIP() -        self.core.addonManager.afterReconnecting(ip) - -        self.log.info(_("Reconnected, new IP: %s") % ip) - -        self.reconnecting.clear() - -    def getIP(self): -        """retrieve current ip""" -        services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), -                    ("http://checkip.dyndns.org/",".*Current IP Address: (\S+)</body>.*")] - -        ip = "" -        for i in range(10): -            try: -                sv = choice(services) -                ip = getURL(sv[0]) -                ip = re.match(sv[1], ip).group(1) -                break -            except: -                ip = "" -                sleep(1) - -        return ip - -    def checkThreadCount(self): -        """checks if there is a need for increasing or reducing thread count""" - -        if len(self.threads) == self.core.config.get("download", "max_downloads"): -            return True -        elif len(self.threads) < self.core.config.get("download", "max_downloads"): -            self.createThread() -        else: -            free = [x for x in self.threads if not x.active] -            if free: -                free[0].put("quit") - - -    def cleanPycurl(self): -        """ make a global curl cleanup (currently unused) """ -        if self.processingIds(): -            return False -        pycurl.global_cleanup() -        pycurl.global_init(pycurl.GLOBAL_DEFAULT) -        self.downloaded = 0 -        self.log.debug("Cleaned up pycurl") -        return True - - -    def assignJob(self): -        """assign a job to a thread if possible""" - -        if self.pause or not self.core.api.isTimeDownload(): return - -        #if self.downloaded > 20: -        #    if not self.cleanPyCurl(): return - -        free = [x for x in self.threads if not x.active] - -        inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()] -        inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in inuse] -        occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]]))) -         -        job = self.core.files.getJob(occ) -        if job: -            try: -                job.initPlugin() -            except Exception, e: -                self.log.critical(str(e)) -                print_exc() -                job.setStatus("failed") -                job.error = str(e) -                job.release() -                return - -            spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 -            if spaceLeft < self.core.config["general"]["min_free_space"]: -                self.log.warning(_("Not enough space left on device")) -                self.pause = True - -            if free and not self.pause: -                thread = free[0] -                #self.downloaded += 1 -                thread.put(job) -            else: -                #put job back -                if occ not in self.core.files.jobCache: -                    self.core.files.jobCache[occ] = [] -                self.core.files.jobCache[occ].append(job.id) - -    def cleanup(self): -        """do global cleanup, should be called when finished with pycurl""" -        pycurl.global_cleanup() diff --git a/module/threads/__init__.py b/module/threads/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/module/threads/__init__.py +++ /dev/null | 
