diff options
| author | 2014-09-08 01:08:03 +0200 | |
|---|---|---|
| committer | 2014-09-14 11:03:20 +0200 | |
| commit | 91115fd577f20704ef7f2e74c4527ffbb0730a09 (patch) | |
| tree | 198fc68f1eb6f42e220d0f1c68499f48ae70543c /pyload/manager | |
| parent | Project __init__ with info (diff) | |
| download | pyload-91115fd577f20704ef7f2e74c4527ffbb0730a09.tar.xz | |
Restructure pyload file tree (1)
Diffstat (limited to 'pyload/manager')
| -rw-r--r-- | pyload/manager/AccountManager.py | 173 | ||||
| -rw-r--r-- | pyload/manager/CaptchaManager.py | 158 | ||||
| -rw-r--r-- | pyload/manager/HookManager.py | 305 | ||||
| -rw-r--r-- | pyload/manager/PluginManager.py | 356 | ||||
| -rw-r--r-- | pyload/manager/RemoteManager.py | 91 | ||||
| -rw-r--r-- | pyload/manager/ThreadManager.py | 317 | ||||
| -rw-r--r-- | pyload/manager/__init__.py | 0 | ||||
| -rw-r--r-- | pyload/manager/event/PullEvents.py | 120 | ||||
| -rw-r--r-- | pyload/manager/event/Scheduler.py | 141 | ||||
| -rw-r--r-- | pyload/manager/event/__init__.py | 0 | ||||
| -rw-r--r-- | pyload/manager/thread/PluginThread.py | 675 | ||||
| -rw-r--r-- | pyload/manager/thread/ServerThread.py | 108 | ||||
| -rw-r--r-- | pyload/manager/thread/__init__.py | 0 | 
13 files changed, 2444 insertions, 0 deletions
| diff --git a/pyload/manager/AccountManager.py b/pyload/manager/AccountManager.py new file mode 100644 index 000000000..d1958f4b6 --- /dev/null +++ b/pyload/manager/AccountManager.py @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- + +from os.path import exists +from shutil import copy + +from threading import Lock + +from pyload.manager.event.PullEvents import AccountUpdateEvent +from pyload.utils import chmod, lock + +ACC_VERSION = 1 + + +class AccountManager: +    """manages all accounts""" + +    #-------------------------------------------------------------------------- +    def __init__(self, core): +        """Constructor""" + +        self.core = core +        self.lock = Lock() + +        self.initPlugins() +        self.saveAccounts() # save to add categories to conf + +    def initPlugins(self): +        self.accounts = {} # key = ( plugin ) +        self.plugins = {} + +        self.initAccountPlugins() +        self.loadAccounts() + +    def getAccountPlugin(self, plugin): +        """get account instance for plugin or None if anonymous""" +        if plugin in self.accounts: +            if plugin not in self.plugins: +                try: +                    self.plugins[plugin] = self.core.pluginManager.loadClass("accounts", plugin)(self, self.accounts[plugin]) +                except TypeError:  # The account class no longer exists (blacklisted plugin). Skipping the account to avoid crash +                    return None + +            return self.plugins[plugin] +        else: +            return None + +    def getAccountPlugins(self): +        """ get all account instances""" + +        plugins = [] +        for plugin in self.accounts.keys(): +            plugins.append(self.getAccountPlugin(plugin)) + +        return plugins + +    #-------------------------------------------------------------------------- +    def loadAccounts(self): +        """loads all accounts available""" + +        if not exists("accounts.conf"): +            f = open("accounts.conf", "wb") +            f.write("version: " + str(ACC_VERSION)) +            f.close() + +        f = open("accounts.conf", "rb") +        content = f.readlines() +        version = content[0].split(":")[1].strip() if content else "" +        f.close() + +        if not version or int(version) < ACC_VERSION: +            copy("accounts.conf", "accounts.backup") +            f = open("accounts.conf", "wb") +            f.write("version: " + str(ACC_VERSION)) +            f.close() +            self.core.log.warning(_("Account settings deleted, due to new config format.")) +            return + +        plugin = "" +        name = "" + +        for line in content[1:]: +            line = line.strip() + +            if not line: continue +            if line.startswith("#"): continue +            if line.startswith("version"): continue + +            if line.endswith(":") and line.count(":") == 1: +                plugin = line[:-1] +                self.accounts[plugin] = {} + +            elif line.startswith("@"): +                try: +                    option = line[1:].split() +                    self.accounts[plugin][name]['options'][option[0]] = [] if len(option) < 2 else ([option[1]] if len(option) < 3 else option[1:]) +                except: +                    pass + +            elif ":" in line: +                name, sep, pw = line.partition(":") +                self.accounts[plugin][name] = {"password": pw, "options": {}, "valid": True} + +    #-------------------------------------------------------------------------- +    def saveAccounts(self): +        """save all account information""" + +        f = open("accounts.conf", "wb") +        f.write("version: " + str(ACC_VERSION) + "\n") + +        for plugin, accounts in self.accounts.iteritems(): +            f.write("\n") +            f.write(plugin+":\n") + +            for name,data in accounts.iteritems(): +                f.write("\n\t%s:%s\n" % (name,data['password']) ) +                if data['options']: +                    for option, values in data['options'].iteritems(): +                        f.write("\t@%s %s\n" % (option, " ".join(values))) + +        f.close() +        chmod(f.name, 0600) + +    #-------------------------------------------------------------------------- +    def initAccountPlugins(self): +        """init names""" +        for name in self.core.pluginManager.getAccountPlugins(): +            self.accounts[name] = {} + +    @lock +    def updateAccount(self, plugin , user, password=None, options={}): +        """add or update account""" +        if plugin in self.accounts: +            p = self.getAccountPlugin(plugin) +            updated = p.updateAccounts(user, password, options) +            #since accounts is a ref in plugin self.accounts doesnt need to be updated here + +            self.saveAccounts() +            if updated: p.scheduleRefresh(user, force=False) + +    @lock +    def removeAccount(self, plugin, user): +        """remove account""" + +        if plugin in self.accounts: +            p = self.getAccountPlugin(plugin) +            p.removeAccount(user) + +            self.saveAccounts() + +    @lock +    def getAccountInfos(self, force=True, refresh=False): +        data = {} + +        if refresh: +            self.core.scheduler.addJob(0, self.core.accountManager.getAccountInfos) +            force = False + +        for p in self.accounts.keys(): +            if self.accounts[p]: +                p = self.getAccountPlugin(p) +                if p: +                    data[p.__name__] = p.getAllAccounts(force) +                else:  # When an account has been skipped, p is None +                    data[p] = [] +            else: +                data[p] = [] +        e = AccountUpdateEvent() +        self.core.pullManager.addEvent(e) +        return data + +    def sendChange(self): +        e = AccountUpdateEvent() +        self.core.pullManager.addEvent(e) diff --git a/pyload/manager/CaptchaManager.py b/pyload/manager/CaptchaManager.py new file mode 100644 index 000000000..0ba876ae8 --- /dev/null +++ b/pyload/manager/CaptchaManager.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- + +""" +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, +    or (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +    See the GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License +    along with this program; if not, see <http://www.gnu.org/licenses/>. + +    @author: mkaay, RaNaN +""" + +from time import time +from traceback import print_exc +from threading import Lock + +class CaptchaManager: +    def __init__(self, core): +        self.lock = Lock() +        self.core = core +        self.tasks = [] #task store, for outgoing tasks only + +        self.ids = 0 #only for internal purpose + +    def newTask(self, img, format, file, result_type): +        task = CaptchaTask(self.ids, img, format, file, result_type) +        self.ids += 1 +        return task + +    def removeTask(self, task): +        self.lock.acquire() +        if task in self.tasks: +            self.tasks.remove(task) +        self.lock.release() + +    def getTask(self): +        self.lock.acquire() +        for task in self.tasks: +            if task.status in ("waiting", "shared-user"): +                self.lock.release() +                return task +        self.lock.release() +        return None + +    def getTaskByID(self, tid): +        self.lock.acquire() +        for task in self.tasks: +            if task.id == str(tid): #task ids are strings +                self.lock.release() +                return task +        self.lock.release() +        return None + +    def handleCaptcha(self, task): +        cli = self.core.isClientConnected() + +        if cli: #client connected -> should solve the captcha +            task.setWaiting(50) #wait 50 sec for response + +        for plugin in self.core.hookManager.activePlugins(): +            try: +                plugin.newCaptchaTask(task) +            except: +                if self.core.debug: +                    print_exc() + +        if task.handler or cli: #the captcha was handled +            self.tasks.append(task) +            return True + +        task.error = _("No Client connected for captcha decrypting") + +        return False + + +class CaptchaTask: +    def __init__(self, id, img, format, file, result_type='textual'): +        self.id = str(id) +        self.captchaImg = img +        self.captchaFormat = format +        self.captchaFile = file +        self.captchaResultType = result_type +        self.handler = [] #the hook plugins that will take care of the solution +        self.result = None +        self.waitUntil = None +        self.error = None #error message + +        self.status = "init" +        self.data = {} #handler can store data here + +    def getCaptcha(self): +        return self.captchaImg, self.captchaFormat, self.captchaResultType + +    def setResult(self, text): +        if self.isTextual(): +            self.result = text +        if self.isPositional(): +            try: +                parts = text.split(',') +                self.result = (int(parts[0]), int(parts[1])) +            except: +                self.result = None + +    def getResult(self): +        try: +            res = self.result.encode("utf8", "replace") +        except: +            res = self.result + +        return res + +    def getStatus(self): +        return self.status + +    def setWaiting(self, sec): +        """ let the captcha wait secs for the solution """ +        self.waitUntil = max(time() + sec, self.waitUntil) +        self.status = "waiting" + +    def isWaiting(self): +        if self.result or self.error or time() > self.waitUntil: +            return False + +        return True + +    def isTextual(self): +        """ returns if text is written on the captcha """ +        return self.captchaResultType == 'textual' + +    def isPositional(self): +        """ returns if user have to click a specific region on the captcha """ +        return self.captchaResultType == 'positional' + +    def setWatingForUser(self, exclusive): +        if exclusive: +            self.status = "user" +        else: +            self.status = "shared-user" + +    def timedOut(self): +        return time() > self.waitUntil + +    def invalid(self): +        """ indicates the captcha was not correct """ +        [x.captchaInvalid(self) for x in self.handler] + +    def correct(self): +        [x.captchaCorrect(self) for x in self.handler] + +    def __str__(self): +        return "<CaptchaTask '%s'>" % self.id diff --git a/pyload/manager/HookManager.py b/pyload/manager/HookManager.py new file mode 100644 index 000000000..6f5477aeb --- /dev/null +++ b/pyload/manager/HookManager.py @@ -0,0 +1,305 @@ +# -*- coding: utf-8 -*- + +""" +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, +    or (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +    See the GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License +    along with this program; if not, see <http://www.gnu.org/licenses/>. + +    @author: RaNaN, mkaay +    @interface-version: 0.1 +""" +import __builtin__ + +import traceback +from thread import start_new_thread +from threading import RLock + +from types import MethodType + +from pyload.manager.thread.PluginThread import HookThread +from pyload.manager.PluginManager import literal_eval +from utils import lock + +class HookManager: +    """Manages hooks, delegates and handles Events. + +        Every plugin can define events, \ +        but some very usefull events are called by the Core. +        Contrary to overwriting hook methods you can use event listener, +        which provides additional entry point in the control flow. +        Only do very short tasks or use threads. + +        **Known Events:** +        Most hook methods exists as events. These are the additional known events. + +        ===================== ============== ================================== +        Name                     Arguments      Description +        ===================== ============== ================================== +        downloadPreparing     fid            A download was just queued and will be prepared now. +        downloadStarts        fid            A plugin will immediately starts the download afterwards. +        linksAdded            links, pid     Someone just added links, you are able to modify the links. +        allDownloadsProcessed                Every link was handled, pyload would idle afterwards. +        allDownloadsFinished                 Every download in queue is finished. +        unrarFinished         folder, fname  An Unrar job finished +        configChanged                        The config was changed via the api. +        pluginConfigChanged                  The plugin config changed, due to api or internal process. +        ===================== ============== ================================== + +        | Notes: +        |    allDownloadsProcessed is *always* called before allDownloadsFinished. +        |    configChanged is *always* called before pluginConfigChanged. + + +    """ + +    def __init__(self, core): +        self.core = core +        self.config = self.core.config + +        __builtin__.hookManager = self #needed to let hooks register themself + +        self.log = self.core.log +        self.plugins = [] +        self.pluginMap = {} +        self.methods = {} #dict of names and list of methods usable by rpc + +        self.events = {} # contains events + +        #registering callback for config event +        self.config.pluginCB = MethodType(self.dispatchEvent, "pluginConfigChanged", basestring) + +        self.addEvent("pluginConfigChanged", self.manageHooks) + +        self.lock = RLock() +        self.createIndex() + +    def try_catch(func): +        def new(*args): +            try: +                return func(*args) +            except Exception, e: +                args[0].log.error(_("Error executing hooks: %s") % str(e)) +                if args[0].core.debug: +                    traceback.print_exc() + +        return new + + +    def addRPC(self, plugin, func, doc): +        plugin = plugin.rpartition(".")[2] +        doc = doc.strip() if doc else "" + +        if plugin in self.methods: +            self.methods[plugin][func] = doc +        else: +            self.methods[plugin] = {func: doc} + +    def callRPC(self, plugin, func, args, parse): +        if not args: args = tuple() +        if parse: +            args = tuple([literal_eval(x) for x in args]) + +        plugin = self.pluginMap[plugin] +        f = getattr(plugin, func) +        return f(*args) + + +    def createIndex(self): +        plugins = [] + +        active = [] +        deactive = [] + +        for pluginname in self.core.pluginManager.hookPlugins: +            try: +                #hookClass = getattr(plugin, plugin.__name__) + +                if self.config.getPlugin(pluginname, "activated"): +                    pluginClass = self.core.pluginManager.loadClass("hooks", pluginname) +                    if not pluginClass: continue + +                    plugin = pluginClass(self.core, self) +                    plugins.append(plugin) +                    self.pluginMap[pluginClass.__name__] = plugin +                    if plugin.isActivated(): +                        active.append(pluginClass.__name__) +                else: +                    deactive.append(pluginname) + + +            except: +                self.log.warning(_("Failed activating %(name)s") % {"name": pluginname}) +                if self.core.debug: +                    traceback.print_exc() + +        self.log.info(_("Activated plugins: %s") % ", ".join(sorted(active))) +        self.log.info(_("Deactivate plugins: %s") % ", ".join(sorted(deactive))) + +        self.plugins = plugins + +    def manageHooks(self, plugin, name, value): +        if name == "activated" and value: +            self.activateHook(plugin) +        elif name == "activated" and not value: +            self.deactivateHook(plugin) + +    def activateHook(self, plugin): + +        #check if already loaded +        for inst in self.plugins: +            if inst.__name__ == plugin: +                return + +        pluginClass = self.core.pluginManager.loadClass("hooks", plugin) + +        if not pluginClass: return + +        self.log.debug("Plugin loaded: %s" % plugin) + +        plugin = pluginClass(self.core, self) +        self.plugins.append(plugin) +        self.pluginMap[pluginClass.__name__] = plugin + +        # call core Ready +        start_new_thread(plugin.coreReady, tuple()) + +    def deactivateHook(self, plugin): + +        hook = None +        for inst in self.plugins: +            if inst.__name__ == plugin: +                hook = inst + +        if not hook: return + +        self.log.debug("Plugin unloaded: %s" % plugin) + +        hook.unload() + +        #remove periodic call +        self.log.debug("Removed callback %s" % self.core.scheduler.removeJob(hook.cb)) +        self.plugins.remove(hook) +        del self.pluginMap[hook.__name__] + + +    @try_catch +    def coreReady(self): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.coreReady() + +        self.dispatchEvent("coreReady") + +    @try_catch +    def coreExiting(self): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.coreExiting() + +        self.dispatchEvent("coreExiting") + +    @lock +    def downloadPreparing(self, pyfile): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.downloadPreparing(pyfile) + +        self.dispatchEvent("downloadPreparing", pyfile) + +    @lock +    def downloadFinished(self, pyfile): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.downloadFinished(pyfile) + +        self.dispatchEvent("downloadFinished", pyfile) + +    @lock +    @try_catch +    def downloadFailed(self, pyfile): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.downloadFailed(pyfile) + +        self.dispatchEvent("downloadFailed", pyfile) + +    @lock +    def packageFinished(self, package): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.packageFinished(package) + +        self.dispatchEvent("packageFinished", package) + +    @lock +    def beforeReconnecting(self, ip): +        for plugin in self.plugins: +            plugin.beforeReconnecting(ip) + +        self.dispatchEvent("beforeReconnecting", ip) + +    @lock +    def afterReconnecting(self, ip): +        for plugin in self.plugins: +            if plugin.isActivated(): +                plugin.afterReconnecting(ip) + +        self.dispatchEvent("afterReconnecting", ip) + +    def startThread(self, function, *args, **kwargs): +        t = HookThread(self.core.threadManager, function, args, kwargs) + +    def activePlugins(self): +        """ returns all active plugins """ +        return [x for x in self.plugins if x.isActivated()] + +    def getAllInfo(self): +        """returns info stored by hook plugins""" +        info = {} +        for name, plugin in self.pluginMap.iteritems(): +            if plugin.info: +                #copy and convert so str +                info[name] = dict([(x, str(y) if not isinstance(y, basestring) else y) for x, y in plugin.info.iteritems()]) +        return info + + +    def getInfo(self, plugin): +        info = {} +        if plugin in self.pluginMap and self.pluginMap[plugin].info: +            info = dict([(x, str(y) if not isinstance(y, basestring) else y) +                for x, y in self.pluginMap[plugin].info.iteritems()]) + +        return info + +    def addEvent(self, event, func): +        """Adds an event listener for event name""" +        if event in self.events: +            self.events[event].append(func) +        else: +            self.events[event] = [func] + +    def removeEvent(self, event, func): +        """removes previously added event listener""" +        if event in self.events: +            self.events[event].remove(func) + +    def dispatchEvent(self, event, *args): +        """dispatches event with args""" +        if event in self.events: +            for f in self.events[event]: +                try: +                    f(*args) +                except Exception, e: +                    self.log.warning("Error calling event handler %s: %s, %s, %s" +                    % (event, f, args, str(e))) +                    if self.core.debug: +                        traceback.print_exc() diff --git a/pyload/manager/PluginManager.py b/pyload/manager/PluginManager.py new file mode 100644 index 000000000..56e59237c --- /dev/null +++ b/pyload/manager/PluginManager.py @@ -0,0 +1,356 @@ +# -*- coding: utf-8 -*- + +import re +import sys + +from itertools import chain +from os import listdir, makedirs +from os.path import isfile, join, exists, abspath +from sys import version_info +from traceback import print_exc + +from SafeEval import const_eval as literal_eval + +from pyload.config.Parser import IGNORE + + +class PluginManager: +    ROOT = "pyload.plugins." +    USERROOT = "userplugins." +    TYPES = ("accounts", "container", "crypter", "hooks", "hoster", "internal",  "ocr") + +    PATTERN = re.compile(r'__pattern__.*=.*r("|\')([^"\']+)') +    VERSION = re.compile(r'__version__.*=.*("|\')([0-9.]+)') +    CONFIG = re.compile(r'__config__.*=.*\[([^\]]+)', re.MULTILINE) +    DESC = re.compile(r'__description__.?=.?("|"""|\')([^"\']+)') + + +    def __init__(self, core): +        self.core = core + +        self.config = core.config +        self.log = core.log + +        self.plugins = {} +        self.createIndex() + +        #register for import hook +        sys.meta_path.append(self) + + +    def createIndex(self): +        """create information for all plugins available""" + +        sys.path.append(abspath("")) + +        if not exists("userplugins"): +            makedirs("userplugins") +        if not exists(join("userplugins", "__init__.py")): +            f = open(join("userplugins", "__init__.py"), "wb") +            f.close() + +        self.plugins['crypter'] = self.crypterPlugins = self.parse("crypter", pattern=True) +        self.plugins['container'] = self.containerPlugins = self.parse("container", pattern=True) +        self.plugins['hoster'] = self.hosterPlugins = self.parse("hoster", pattern=True) + +        self.plugins['ocr'] = self.captchaPlugins = self.parse("ocr") +        self.plugins['accounts'] = self.accountPlugins = self.parse("accounts") +        self.plugins['hooks'] = self.hookPlugins = self.parse("hooks") +        self.plugins['internal'] = self.internalPlugins = self.parse("internal") + +        self.log.debug("created index of plugins") + +    def parse(self, folder, pattern=False, home={}): +        """ +        returns dict with information +        home contains parsed plugins from pyload. + +        { +        name : {path, version, config, (pattern, re), (plugin, class)} +        } + +        """ +        plugins = {} +        if home: +            pfolder = join("userplugins", folder) +            if not exists(pfolder): +                makedirs(pfolder) +            if not exists(join(pfolder, "__init__.py")): +                f = open(join(pfolder, "__init__.py"), "wb") +                f.close() + +        else: +            pfolder = join(pypath, "pyload", "plugins", folder) + +        for f in listdir(pfolder): +            if (isfile(join(pfolder, f)) and f.endswith(".py") or f.endswith("_25.pyc") or f.endswith( +                "_26.pyc") or f.endswith("_27.pyc")) and not f.startswith("_"): +                data = open(join(pfolder, f)) +                content = data.read() +                data.close() + +                if f.endswith("_25.pyc") and version_info[0:2] != (2, 5): +                    continue +                elif f.endswith("_26.pyc") and version_info[0:2] != (2, 6): +                    continue +                elif f.endswith("_27.pyc") and version_info[0:2] != (2, 7): +                    continue + +                name = f[:-3] +                if name[-1] == ".": name = name[:-4] + +                version = self.VERSION.findall(content) +                if version: +                    version = float(version[0][1]) +                else: +                    version = 0 + +                # home contains plugins from pyload root +                if home and name in home: +                    if home[name]['v'] >= version: +                        continue + +                if name in IGNORE or (folder, name) in IGNORE: +                     continue + +                plugins[name] = {} +                plugins[name]['v'] = version + +                module = f.replace(".pyc", "").replace(".py", "") + +                # the plugin is loaded from user directory +                plugins[name]['user'] = True if home else False +                plugins[name]['name'] = module + +                if pattern: +                    pattern = self.PATTERN.findall(content) + +                    if pattern: +                        pattern = pattern[0][1] +                    else: +                        pattern = "^unmachtable$" + +                    plugins[name]['pattern'] = pattern + +                    try: +                        plugins[name]['re'] = re.compile(pattern) +                    except: +                        self.log.error(_("%s has a invalid pattern.") % name) + + +                # internals have no config +                if folder == "internal": +                    self.config.deleteConfig(name) +                    continue + +                config = self.CONFIG.findall(content) +                if config: +                    config = literal_eval(config[0].strip().replace("\n", "").replace("\r", "")) +                    desc = self.DESC.findall(content) +                    desc = desc[0][1] if desc else "" + +                    if type(config[0]) == tuple: +                        config = [list(x) for x in config] +                    else: +                        config = [list(config)] + +                    if folder == "hooks": +                        append = True +                        for item in config: +                            if item[0] == "activated": append = False + +                        # activated flag missing +                        if append: config.append(["activated", "bool", "Activated", False]) + +                    try: +                        self.config.addPluginConfig(name, config, desc) +                    except: +                        self.log.error("Invalid config in %s: %s" % (name, config)) + +                elif folder == "hooks": #force config creation +                    desc = self.DESC.findall(content) +                    desc = desc[0][1] if desc else "" +                    config = (["activated", "bool", "Activated", False],) + +                    try: +                        self.config.addPluginConfig(name, config, desc) +                    except: +                        self.log.error("Invalid config in %s: %s" % (name, config)) + +        if not home: +            temp = self.parse(folder, pattern, plugins) +            plugins.update(temp) + +        return plugins + + +    def parseUrls(self, urls): +        """parse plugins for given list of urls""" + +        last = None +        res = [] # tupels of (url, plugin) + +        for url in urls: +            if type(url) not in (str, unicode, buffer): continue +            found = False + +            if last and last[1]['re'].match(url): +                res.append((url, last[0])) +                continue + +            for name, value in chain(self.crypterPlugins.iteritems(), self.hosterPlugins.iteritems(), +                self.containerPlugins.iteritems()): +                if value['re'].match(url): +                    res.append((url, name)) +                    last = (name, value) +                    found = True +                    break + +            if not found: +                res.append((url, "BasePlugin")) + +        return res + +    def findPlugin(self, name, pluginlist=("hoster", "crypter", "container")): +        for ptype in pluginlist: +            if name in self.plugins[ptype]: +                return self.plugins[ptype][name], ptype +        return None, None + +    def getPlugin(self, name, original=False): +        """return plugin module from hoster|decrypter|container""" +        plugin, type = self.findPlugin(name) + +        if not plugin: +            self.log.warning("Plugin %s not found." % name) +            plugin = self.hosterPlugins['BasePlugin'] + +        if "new_module" in plugin and not original: +            return plugin['new_module'] + +        return self.loadModule(type, name) + +    def getPluginName(self, name): +        """ used to obtain new name if other plugin was injected""" +        plugin, type = self.findPlugin(name) + +        if "new_name" in plugin: +            return plugin['new_name'] + +        return name + +    def loadModule(self, type, name): +        """ Returns loaded module for plugin + +        :param type: plugin type, subfolder of pyload.plugins +        :param name: +        """ +        plugins = self.plugins[type] +        if name in plugins: +            if "module" in plugins[name]: return plugins[name]['module'] +            try: +                module = __import__(self.ROOT + "%s.%s" % (type, plugins[name]['name']), globals(), locals(), +                    plugins[name]['name']) +                plugins[name]['module'] = module  #cache import, maybe unneeded +                return module +            except Exception, e: +                self.log.error(_("Error importing %(name)s: %(msg)s") % {"name": name, "msg": str(e)}) +                if self.core.debug: +                    print_exc() + +    def loadClass(self, type, name): +        """Returns the class of a plugin with the same name""" +        module = self.loadModule(type, name) +        if module: return getattr(module, name) + +    def getAccountPlugins(self): +        """return list of account plugin names""" +        return self.accountPlugins.keys() + +    def find_module(self, fullname, path=None): +        #redirecting imports if necesarry +        if fullname.startswith(self.ROOT) or fullname.startswith(self.USERROOT): #seperate pyload plugins +            if fullname.startswith(self.USERROOT): user = 1 +            else: user = 0 #used as bool and int + +            split = fullname.split(".") +            if len(split) != 4 - user: return +            type, name = split[2 - user:4 - user] + +            if type in self.plugins and name in self.plugins[type]: +                #userplugin is a newer version +                if not user and self.plugins[type][name]['user']: +                    return self +                #imported from userdir, but pyloads is newer +                if user and not self.plugins[type][name]['user']: +                    return self + + +    def load_module(self, name, replace=True): +        if name not in sys.modules:  #could be already in modules +            if replace: +                if self.ROOT in name: +                    newname = name.replace(self.ROOT, self.USERROOT) +                else: +                    newname = name.replace(self.USERROOT, self.ROOT) +            else: newname = name + +            base, plugin = newname.rsplit(".", 1) + +            self.log.debug("Redirected import %s -> %s" % (name, newname)) + +            module = __import__(newname, globals(), locals(), [plugin]) +            #inject under new an old name +            sys.modules[name] = module +            sys.modules[newname] = module + +        return sys.modules[name] + + +    def reloadPlugins(self, type_plugins): +        """ reload and reindex plugins """ +        if not type_plugins: +            return None + +        self.log.debug("Request reload of plugins: %s" % type_plugins) + +        reloaded = [] + +        as_dict = {} +        for t,n in type_plugins: +            if t in ("hooks", "internal"):  #: do not reload hooks or internals, because would cause to much side effects +                continue +            elif t in as_dict: +                as_dict[t].append(n) +            else: +                as_dict[t] = [n] + +        for type in as_dict.iterkeys(): +            for plugin in as_dict[type]: +                if plugin in self.plugins[type] and "module" in self.plugins[type][plugin]: +                    self.log.debug("Reloading %s" % plugin) +                    id = (type, plugin) +                    try: +                        reload(self.plugins[type][plugin]['module']) +                    except Exception, e: +                        self.log.error("Error when reloading %s" % id, str(e)) +                        continue +                    else: +                        reloaded.append(id) + +        #index creation +        self.plugins['crypter'] = self.crypterPlugins = self.parse("crypter", pattern=True) +        self.plugins['container'] = self.containerPlugins = self.parse("container", pattern=True) +        self.plugins['hoster'] = self.hosterPlugins = self.parse("hoster", pattern=True) +        self.plugins['ocr'] = self.captchaPlugins = self.parse("ocr") +        self.plugins['accounts'] = self.accountPlugins = self.parse("accounts") + +        if "accounts" in as_dict:  #: accounts needs to be reloaded +            self.core.accountManager.initPlugins() +            self.core.scheduler.addJob(0, self.core.accountManager.getAccountInfos) + +        return reloaded  #: return a list of the plugins successfully reloaded + +    def reloadPlugin(self, type_plugin): +        """ reload and reindex ONE plugin """ +        return True if self.reloadPlugins(type_plugin) else False diff --git a/pyload/manager/RemoteManager.py b/pyload/manager/RemoteManager.py new file mode 100644 index 000000000..e53e317e3 --- /dev/null +++ b/pyload/manager/RemoteManager.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +""" +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, +    or (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +    See the GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License +    along with this program; if not, see <http://www.gnu.org/licenses/>. + +    @author: mkaay +""" + +from threading import Thread +from traceback import print_exc + +class BackendBase(Thread): +    def __init__(self, manager): +        Thread.__init__(self) +        self.m = manager +        self.core = manager.core +        self.enabled = True +        self.running = False + +    def run(self): +        self.running = True +        try: +            self.serve() +        except Exception, e: +            self.core.log.error(_("Remote backend error: %s") % e) +            if self.core.debug: +                print_exc() +        finally: +            self.running = False + +    def setup(self, host, port): +        pass + +    def checkDeps(self): +        return True + +    def serve(self): +        pass + +    def shutdown(self): +        pass + +    def stop(self): +        self.enabled = False# set flag and call shutdowm message, so thread can react +        self.shutdown() + + +class RemoteManager: +    available = [] + +    def __init__(self, core): +        self.core = core +        self.backends = [] + +        if self.core.remote: +            self.available.append("ThriftBackend") +#        else: +#            self.available.append("SocketBackend") + + +    def startBackends(self): +        host = self.core.config["remote"]["listenaddr"] +        port = self.core.config["remote"]["port"] + +        for b in self.available: +            klass = getattr(__import__("pyload.remote.%s" % b, globals(), locals(), [b], -1), b) +            backend = klass(self) +            if not backend.checkDeps(): +                continue +            try: +                backend.setup(host, port) +                self.core.log.info(_("Starting %(name)s: %(addr)s:%(port)s") % {"name": b, "addr": host, "port": port}) +            except Exception, e: +                self.core.log.error(_("Failed loading backend %(name)s | %(error)s") % {"name": b, "error": str(e)}) +                if self.core.debug: +                    print_exc() +            else: +                backend.start() +                self.backends.append(backend) + +            port += 1 diff --git a/pyload/manager/ThreadManager.py b/pyload/manager/ThreadManager.py new file mode 100644 index 000000000..1073f8040 --- /dev/null +++ b/pyload/manager/ThreadManager.py @@ -0,0 +1,317 @@ +# -*- coding: utf-8 -*- + +""" +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, +    or (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +    See the GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License +    along with this program; if not, see <http://www.gnu.org/licenses/>. + +    @author: RaNaN +""" + +from os.path import exists, join +import re +from subprocess import Popen +from threading import Event, Lock +from time import sleep, time +from traceback import print_exc +from random import choice + +import pycurl + +from pyload.manager.thread import PluginThread +from pyload.datatypes.PyFile import PyFile +from pyload.network.RequestFactory import getURL +from pyload.utils import freeSpace, lock + + +class ThreadManager: +    """manages the download threads, assign jobs, reconnect etc""" + + +    def __init__(self, core): +        """Constructor""" +        self.core = core +        self.log = core.log + +        self.threads = []  # thread list +        self.localThreads = []  #hook+decrypter threads + +        self.pause = True + +        self.reconnecting = Event() +        self.reconnecting.clear() +        self.downloaded = 0 #number of files downloaded since last cleanup + +        self.lock = Lock() + +        # some operations require to fetch url info from hoster, so we caching them so it wont be done twice +        # contains a timestamp and will be purged after timeout +        self.infoCache = {} + +        # pool of ids for online check +        self.resultIDs = 0 + +        # threads which are fetching hoster results +        self.infoResults = {} +        #timeout for cache purge +        self.timestamp = 0 + +        pycurl.global_init(pycurl.GLOBAL_DEFAULT) + +        for i in range(0, self.core.config.get("download", "max_downloads")): +            self.createThread() + + +    def createThread(self): +        """create a download thread""" + +        thread = PluginThread.DownloadThread(self) +        self.threads.append(thread) + +    def createInfoThread(self, data, pid): +        """ +        start a thread whichs fetches online status and other infos +        data = [ .. () .. ] +        """ +        self.timestamp = time() + 5 * 60 + +        PluginThread.InfoThread(self, data, pid) + +    @lock +    def createResultThread(self, data, add=False): +        """ creates a thread to fetch online status, returns result id """ +        self.timestamp = time() + 5 * 60 + +        rid = self.resultIDs +        self.resultIDs += 1 + +        PluginThread.InfoThread(self, data, rid=rid, add=add) + +        return rid + + +    @lock +    def getInfoResult(self, rid): +        """returns result and clears it""" +        self.timestamp = time() + 5 * 60 + +        if rid in self.infoResults: +            data = self.infoResults[rid] +            self.infoResults[rid] = {} +            return data +        else: +            return {} + +    @lock +    def setInfoResults(self, rid, result): +        self.infoResults[rid].update(result) + +    def getActiveFiles(self): +        active = [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] + +        for t in self.localThreads: +            active.extend(t.getActiveFiles()) + +        return active + +    def processingIds(self): +        """get a id list of all pyfiles processed""" +        return [x.id for x in self.getActiveFiles()] + + +    def work(self): +        """run all task which have to be done (this is for repetivive call by core)""" +        try: +            self.tryReconnect() +        except Exception, e: +            self.log.error(_("Reconnect Failed: %s") % str(e) ) +            self.reconnecting.clear() +            if self.core.debug: +                print_exc() +        self.checkThreadCount() + +        try: +            self.assignJob() +        except Exception, e: +            self.log.warning("Assign job error", e) +            if self.core.debug: +                print_exc() + +            sleep(0.5) +            self.assignJob() +            #it may be failed non critical so we try it again + +        if (self.infoCache or self.infoResults) and self.timestamp < time(): +            self.infoCache.clear() +            self.infoResults.clear() +            self.log.debug("Cleared Result cache") + +    #-------------------------------------------------------------------------- +    def tryReconnect(self): +        """checks if reconnect needed""" + +        if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()): +            return False + +        active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] + +        if not (0 < active.count(True) == len(active)): +            return False + +        if not exists(self.core.config['reconnect']['method']): +            if exists(join(pypath, self.core.config['reconnect']['method'])): +                self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) +            else: +                self.core.config["reconnect"]["activated"] = False +                self.log.warning(_("Reconnect script not found!")) +                return + +        self.reconnecting.set() + +        #Do reconnect +        self.log.info(_("Starting reconnect")) + +        while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: +            sleep(0.25) + +        ip = self.getIP() + +        self.core.hookManager.beforeReconnecting(ip) + +        self.log.debug("Old IP: %s" % ip) + +        try: +            reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE) +        except: +            self.log.warning(_("Failed executing reconnect script!")) +            self.core.config["reconnect"]["activated"] = False +            self.reconnecting.clear() +            if self.core.debug: +                print_exc() +            return + +        reconn.wait() +        sleep(1) +        ip = self.getIP() +        self.core.hookManager.afterReconnecting(ip) + +        self.log.info(_("Reconnected, new IP: %s") % ip) + +        self.reconnecting.clear() + +    def getIP(self): +        """retrieve current ip""" +        services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), +                    ("http://checkip.dyndns.org/",".*Current IP Address: (\S+)</body>.*")] + +        ip = "" +        for i in range(10): +            try: +                sv = choice(services) +                ip = getURL(sv[0]) +                ip = re.match(sv[1], ip).group(1) +                break +            except: +                ip = "" +                sleep(1) + +        return ip + +    #-------------------------------------------------------------------------- +    def checkThreadCount(self): +        """checks if there are need for increasing or reducing thread count""" + +        if len(self.threads) == self.core.config.get("download", "max_downloads"): +            return True +        elif len(self.threads) < self.core.config.get("download", "max_downloads"): +            self.createThread() +        else: +            free = [x for x in self.threads if not x.active] +            if free: +                free[0].put("quit") + + +    def cleanPycurl(self): +        """ make a global curl cleanup (currently ununused) """ +        if self.processingIds(): +            return False +        pycurl.global_cleanup() +        pycurl.global_init(pycurl.GLOBAL_DEFAULT) +        self.downloaded = 0 +        self.log.debug("Cleaned up pycurl") +        return True + +    #-------------------------------------------------------------------------- +    def assignJob(self): +        """assing a job to a thread if possible""" + +        if self.pause or not self.core.api.isTimeDownload(): return + +        #if self.downloaded > 20: +        #    if not self.cleanPyCurl(): return + +        free = [x for x in self.threads if not x.active] + +        inuse = set([(x.active.pluginname, self.getLimit(x)) for x in self.threads if x.active and x.active.hasPlugin() and x.active.plugin.account]) +        inuse = map(lambda x: (x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) ,inuse) +        onlimit = [x[0] for x in inuse if x[1] > 0 and x[2] >= x[1]] + +        occ = [x.active.pluginname for x in self.threads if x.active and x.active.hasPlugin() and not x.active.plugin.multiDL] + onlimit + +        occ.sort() +        occ = tuple(set(occ)) +        job = self.core.files.getJob(occ) +        if job: +            try: +                job.initPlugin() +            except Exception, e: +                self.log.critical(str(e)) +                print_exc() +                job.setStatus("failed") +                job.error = str(e) +                job.release() +                return + +            if job.plugin.__type__ == "hoster": +                spaceLeft = freeSpace(self.core.config["general"]["download_folder"]) / 1024 / 1024 +                if spaceLeft < self.core.config["general"]["min_free_space"]: +                    self.log.warning(_("Not enough space left on device")) +                    self.pause = True + +                if free and not self.pause: +                    thread = free[0] +                    #self.downloaded += 1 + +                    thread.put(job) +                else: +                    #put job back +                    if occ not in self.core.files.jobCache: +                        self.core.files.jobCache[occ] = [] +                    self.core.files.jobCache[occ].append(job.id) + +                    #check for decrypt jobs +                    job = self.core.files.getDecryptJob() +                    if job: +                        job.initPlugin() +                        thread = PluginThread.DecrypterThread(self, job) + + +            else: +                thread = PluginThread.DecrypterThread(self, job) + +    def getLimit(self, thread): +        limit = thread.active.plugin.account.getAccountData(thread.active.plugin.user)["options"].get("limitDL", ["0"])[0] +        return int(limit) + +    def cleanup(self): +        """do global cleanup, should be called when finished with pycurl""" +        pycurl.global_cleanup() diff --git a/pyload/manager/__init__.py b/pyload/manager/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pyload/manager/__init__.py diff --git a/pyload/manager/event/PullEvents.py b/pyload/manager/event/PullEvents.py new file mode 100644 index 000000000..0739b4ec8 --- /dev/null +++ b/pyload/manager/event/PullEvents.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- + +""" +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, +    or (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +    See the GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License +    along with this program; if not, see <http://www.gnu.org/licenses/>. + +    @author: mkaay +""" + +from time import time +from pyload.utils import uniqify + +class PullManager: +    def __init__(self, core): +        self.core = core +        self.clients = [] + +    def newClient(self, uuid): +        self.clients.append(Client(uuid)) + +    def clean(self): +        for n, client in enumerate(self.clients): +            if client.lastActive + 30 < time(): +                del self.clients[n] + +    def getEvents(self, uuid): +        events = [] +        validUuid = False +        for client in self.clients: +            if client.uuid == uuid: +                client.lastActive = time() +                validUuid = True +                while client.newEvents(): +                    events.append(client.popEvent().toList()) +                break +        if not validUuid: +            self.newClient(uuid) +            events = [ReloadAllEvent("queue").toList(), ReloadAllEvent("collector").toList()] +        return uniqify(events) + +    def addEvent(self, event): +        for client in self.clients: +            client.addEvent(event) + +class Client: +    def __init__(self, uuid): +        self.uuid = uuid +        self.lastActive = time() +        self.events = [] + +    def newEvents(self): +        return len(self.events) > 0 + +    def popEvent(self): +        if not len(self.events): +            return None +        return self.events.pop(0) + +    def addEvent(self, event): +        self.events.append(event) + +class UpdateEvent: +    def __init__(self, itype, iid, destination): +        assert itype == "pack" or itype == "file" +        assert destination == "queue" or destination == "collector" +        self.type = itype +        self.id = iid +        self.destination = destination + +    def toList(self): +        return ["update", self.destination, self.type, self.id] + +class RemoveEvent: +    def __init__(self, itype, iid, destination): +        assert itype == "pack" or itype == "file" +        assert destination == "queue" or destination == "collector" +        self.type = itype +        self.id = iid +        self.destination = destination + +    def toList(self): +        return ["remove", self.destination, self.type, self.id] + +class InsertEvent: +    def __init__(self, itype, iid, after, destination): +        assert itype == "pack" or itype == "file" +        assert destination == "queue" or destination == "collector" +        self.type = itype +        self.id = iid +        self.after = after +        self.destination = destination + +    def toList(self): +        return ["insert", self.destination, self.type, self.id, self.after] + +class ReloadAllEvent: +    def __init__(self, destination): +        assert destination == "queue" or destination == "collector" +        self.destination = destination + +    def toList(self): +        return ["reload", self.destination] + +class AccountUpdateEvent: +    def toList(self): +        return ["account"] + +class ConfigUpdateEvent: +    def toList(self): +        return ["config"] diff --git a/pyload/manager/event/Scheduler.py b/pyload/manager/event/Scheduler.py new file mode 100644 index 000000000..71b5f96af --- /dev/null +++ b/pyload/manager/event/Scheduler.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- + +""" +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, +    or (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +    See the GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License +    along with this program; if not, see <http://www.gnu.org/licenses/>. + +    @author: mkaay +""" + +from time import time +from heapq import heappop, heappush +from thread import start_new_thread +from threading import Lock + +class AlreadyCalled(Exception): +    pass + + +class Deferred: +    def __init__(self): +        self.call = [] +        self.result = () + +    def addCallback(self, f, *cargs, **ckwargs): +        self.call.append((f, cargs, ckwargs)) + +    def callback(self, *args, **kwargs): +        if self.result: +            raise AlreadyCalled +        self.result = (args, kwargs) +        for f, cargs, ckwargs in self.call: +            args += tuple(cargs) +            kwargs.update(ckwargs) +            f(*args ** kwargs) + + +class Scheduler: +    def __init__(self, core): +        self.core = core + +        self.queue = PriorityQueue() + +    def addJob(self, t, call, args=[], kwargs={}, threaded=True): +        d = Deferred() +        t += time() +        j = Job(t, call, args, kwargs, d, threaded) +        self.queue.put((t, j)) +        return d + + +    def removeJob(self, d): +        """ +        :param d: defered object +        :return: if job was deleted +        """ +        index = -1 + +        for i, j in enumerate(self.queue): +            if j[1].deferred == d: +                index = i + +        if index >= 0: +            del self.queue[index] +            return True + +        return False + +    def work(self): +        while True: +            t, j = self.queue.get() +            if not j: +                break +            else: +                if t <= time(): +                    j.start() +                else: +                    self.queue.put((t, j)) +                    break + + +class Job: +    def __init__(self, time, call, args=[], kwargs={}, deferred=None, threaded=True): +        self.time = float(time) +        self.call = call +        self.args = args +        self.kwargs = kwargs +        self.deferred = deferred +        self.threaded = threaded + +    def run(self): +        ret = self.call(*self.args, **self.kwargs) +        if self.deferred is None: +            return +        else: +            self.deferred.callback(ret) + +    def start(self): +        if self.threaded: +            start_new_thread(self.run, ()) +        else: +            self.run() + + +class PriorityQueue: +    """ a non blocking priority queue """ + +    def __init__(self): +        self.queue = [] +        self.lock = Lock() + +    def __iter__(self): +        return iter(self.queue) + +    def __delitem__(self, key): +        del self.queue[key] + +    def put(self, element): +        self.lock.acquire() +        heappush(self.queue, element) +        self.lock.release() + +    def get(self): +        """ return element or None """ +        self.lock.acquire() +        try: +            el = heappop(self.queue) +            return el +        except IndexError: +            return None, None +        finally: +            self.lock.release() diff --git a/pyload/manager/event/__init__.py b/pyload/manager/event/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pyload/manager/event/__init__.py diff --git a/pyload/manager/thread/PluginThread.py b/pyload/manager/thread/PluginThread.py new file mode 100644 index 000000000..5c274fa46 --- /dev/null +++ b/pyload/manager/thread/PluginThread.py @@ -0,0 +1,675 @@ +# -*- coding: utf-8 -*- + +""" +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, +    or (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +    See the GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License +    along with this program; if not, see <http://www.gnu.org/licenses/>. + +    @author: RaNaN +""" + +from Queue import Queue +from threading import Thread +from os import listdir, stat +from os.path import join +from time import sleep, time, strftime, gmtime +from traceback import print_exc, format_exc +from pprint import pformat +from sys import exc_info, exc_clear +from copy import copy +from types import MethodType + +from pycurl import error + +from pyload.datatypes.PyFile import PyFile +from pyload.plugins.Plugin import Abort, Fail, Reconnect, Retry, SkipDownload +from pyload.utils.packagetools import parseNames +from pyload.utils import safe_join +from pyload.api import OnlineStatus + +class PluginThread(Thread): +    """abstract base class for thread types""" + +    #-------------------------------------------------------------------------- +    def __init__(self, manager): +        """Constructor""" +        Thread.__init__(self) +        self.setDaemon(True) +        self.m = manager #thread manager + + +    def writeDebugReport(self, pyfile): +        """ writes a +        :return: +        """ + +        dump_name = "debug_%s_%s.zip" % (pyfile.pluginname, strftime("%d-%m-%Y_%H-%M-%S")) +        dump = self.getDebugDump(pyfile) + +        try: +            import zipfile + +            zip = zipfile.ZipFile(dump_name, "w") + +            for f in listdir(join("tmp", pyfile.pluginname)): +                try: +                    # avoid encoding errors +                    zip.write(join("tmp", pyfile.pluginname, f), safe_join(pyfile.pluginname, f)) +                except: +                    pass + +            info = zipfile.ZipInfo(safe_join(pyfile.pluginname, "debug_Report.txt"), gmtime()) +            info.external_attr = 0644 << 16L # change permissions + +            zip.writestr(info, dump) +            zip.close() + +            if not stat(dump_name).st_size: +                raise Exception("Empty Zipfile") + +        except Exception, e: +            self.m.log.debug("Error creating zip file: %s" % e) + +            dump_name = dump_name.replace(".zip", ".txt") +            f = open(dump_name, "wb") +            f.write(dump) +            f.close() + +        self.m.core.log.info("Debug Report written to %s" % dump_name) + +    def getDebugDump(self, pyfile): +        dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( +            self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc()) + +        tb = exc_info()[2] +        stack = [] +        while tb: +            stack.append(tb.tb_frame) +            tb = tb.tb_next + +        for frame in stack[1:]: +            dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name, +                                                       frame.f_code.co_filename, +                                                       frame.f_lineno) + +            for key, value in frame.f_locals.items(): +                dump += "\t%20s = " % key +                try: +                    dump += pformat(value) + "\n" +                except Exception, e: +                    dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" + +            del frame + +        del stack #delete it just to be sure... + +        dump += "\n\nPLUGIN OBJECT DUMP: \n\n" + +        for name in dir(pyfile.plugin): +            attr = getattr(pyfile.plugin, name) +            if not name.endswith("__") and type(attr) != MethodType: +                dump += "\t%20s = " % name +                try: +                    dump += pformat(attr) + "\n" +                except Exception, e: +                    dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" + +        dump += "\nPYFILE OBJECT DUMP: \n\n" + +        for name in dir(pyfile): +            attr = getattr(pyfile, name) +            if not name.endswith("__") and type(attr) != MethodType: +                dump += "\t%20s = " % name +                try: +                    dump += pformat(attr) + "\n" +                except Exception, e: +                    dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" + +        if pyfile.pluginname in self.m.core.config.plugin: +            dump += "\n\nCONFIG: \n\n" +            dump += pformat(self.m.core.config.plugin[pyfile.pluginname]) + "\n" + +        return dump + +    def clean(self, pyfile): +        """ set thread unactive and release pyfile """ +        self.active = False +        pyfile.release() + + +class DownloadThread(PluginThread): +    """thread for downloading files from 'real' hoster plugins""" + +    #-------------------------------------------------------------------------- +    def __init__(self, manager): +        """Constructor""" +        PluginThread.__init__(self, manager) + +        self.queue = Queue() # job queue +        self.active = False + +        self.start() + +    #-------------------------------------------------------------------------- +    def run(self): +        """run method""" +        pyfile = None + +        while True: +            del pyfile +            self.active = self.queue.get() +            pyfile = self.active + +            if self.active == "quit": +                self.active = False +                self.m.threads.remove(self) +                return True + +            try: +                if not pyfile.hasPlugin(): continue +                #this pyfile was deleted while queueing + +                pyfile.plugin.checkForSameFiles(starting=True) +                self.m.log.info(_("Download starts: %s" % pyfile.name)) + +                # start download +                self.m.core.hookManager.downloadPreparing(pyfile) +                pyfile.plugin.preprocessing(self) + +                self.m.log.info(_("Download finished: %s") % pyfile.name) +                self.m.core.hookManager.downloadFinished(pyfile) +                self.m.core.files.checkPackageFinished(pyfile) + +            except NotImplementedError: +                self.m.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) +                pyfile.setStatus("failed") +                pyfile.error = "Plugin does not work" +                self.clean(pyfile) +                continue + +            except Abort: +                try: +                    self.m.log.info(_("Download aborted: %s") % pyfile.name) +                except: +                    pass + +                pyfile.setStatus("aborted") + +                self.clean(pyfile) +                continue + +            except Reconnect: +                self.queue.put(pyfile) +                #pyfile.req.clearCookies() + +                while self.m.reconnecting.isSet(): +                    sleep(0.5) + +                continue + +            except Retry, e: +                reason = e.args[0] +                self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) +                self.queue.put(pyfile) +                continue + +            except Fail, e: +                msg = e.args[0] + +                if msg == "offline": +                    pyfile.setStatus("offline") +                    self.m.log.warning(_("Download is offline: %s") % pyfile.name) +                elif msg == "temp. offline": +                    pyfile.setStatus("temp. offline") +                    self.m.log.warning(_("Download is temporary offline: %s") % pyfile.name) +                else: +                    pyfile.setStatus("failed") +                    self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) +                    pyfile.error = msg + +                self.m.core.hookManager.downloadFailed(pyfile) +                self.clean(pyfile) +                continue + +            except error, e: +                if len(e.args) == 2: +                    code, msg = e.args +                else: +                    code = 0 +                    msg = e.args + +                self.m.log.debug("pycurl exception %s: %s" % (code, msg)) + +                if code in (7, 18, 28, 52, 56): +                    self.m.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) +                    wait = time() + 60 + +                    pyfile.waitUntil = wait +                    pyfile.setStatus("waiting") +                    while time() < wait: +                        sleep(1) +                        if pyfile.abort: +                            break + +                    if pyfile.abort: +                        self.m.log.info(_("Download aborted: %s") % pyfile.name) +                        pyfile.setStatus("aborted") + +                        self.clean(pyfile) +                    else: +                        self.queue.put(pyfile) + +                    continue + +                else: +                    pyfile.setStatus("failed") +                    self.m.log.error("pycurl error %s: %s" % (code, msg)) +                    if self.m.core.debug: +                        print_exc() +                        self.writeDebugReport(pyfile) + +                    self.m.core.hookManager.downloadFailed(pyfile) + +                self.clean(pyfile) +                continue + +            except SkipDownload, e: +                pyfile.setStatus("skipped") + +                self.m.log.info( +                    _("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message}) + +                self.clean(pyfile) + +                self.m.core.files.checkPackageFinished(pyfile) + +                self.active = False +                self.m.core.files.save() + +                continue + + +            except Exception, e: +                pyfile.setStatus("failed") +                self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) +                pyfile.error = str(e) + +                if self.m.core.debug: +                    print_exc() +                    self.writeDebugReport(pyfile) + +                self.m.core.hookManager.downloadFailed(pyfile) +                self.clean(pyfile) +                continue + +            finally: +                self.m.core.files.save() +                pyfile.checkIfProcessed() +                exc_clear() + +            #pyfile.plugin.req.clean() + +            self.active = False +            pyfile.finishIfDone() +            self.m.core.files.save() + + +    def put(self, job): +        """assing job to thread""" +        self.queue.put(job) + + +    def stop(self): +        """stops the thread""" +        self.put("quit") + + +class DecrypterThread(PluginThread): +    """thread for decrypting""" + +    def __init__(self, manager, pyfile): +        """constructor""" +        PluginThread.__init__(self, manager) + +        self.active = pyfile +        manager.localThreads.append(self) + +        pyfile.setStatus("decrypting") + +        self.start() + +    def getActiveFiles(self): +        return [self.active] + +    def run(self): +        """run method""" + +        pyfile = self.active +        retry = False + +        try: +            self.m.log.info(_("Decrypting starts: %s") % self.active.name) +            self.active.plugin.preprocessing(self) + +        except NotImplementedError: +            self.m.log.error(_("Plugin %s is missing a function.") % self.active.pluginname) +            return + +        except Fail, e: +            msg = e.args[0] + +            if msg == "offline": +                self.active.setStatus("offline") +                self.m.log.warning(_("Download is offline: %s") % self.active.name) +            else: +                self.active.setStatus("failed") +                self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": msg}) +                self.active.error = msg + +            return + +        except Abort: +            self.m.log.info(_("Download aborted: %s") % pyfile.name) +            pyfile.setStatus("aborted") + +            return + +        except Retry: +            self.m.log.info(_("Retrying %s") % self.active.name) +            retry = True +            return self.run() + +        except Exception, e: +            self.active.setStatus("failed") +            self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": str(e)}) +            self.active.error = str(e) + +            if self.m.core.debug: +                print_exc() +                self.writeDebugReport(pyfile) + +            return + + +        finally: +            if not retry: +                self.active.release() +                self.active = False +                self.m.core.files.save() +                self.m.localThreads.remove(self) +                exc_clear() + + +        #self.m.core.hookManager.downloadFinished(pyfile) + + +        #self.m.localThreads.remove(self) +        #self.active.finishIfDone() +        if not retry: +            pyfile.delete() + + +class HookThread(PluginThread): +    """thread for hooks""" + +    #-------------------------------------------------------------------------- +    def __init__(self, m, function, args, kwargs): +        """Constructor""" +        PluginThread.__init__(self, m) + +        self.f = function +        self.args = args +        self.kwargs = kwargs + +        self.active = [] + +        m.localThreads.append(self) + +        self.start() + +    def getActiveFiles(self): +        return self.active + +    def addActive(self, pyfile): +        """ Adds a pyfile to active list and thus will be displayed on overview""" +        if pyfile not in self.active: +            self.active.append(pyfile) + +    def finishFile(self, pyfile): +        if pyfile in self.active: +            self.active.remove(pyfile) + +        pyfile.finishIfDone() + +    def run(self): +        try: +            try: +                self.kwargs["thread"] = self +                self.f(*self.args, **self.kwargs) +            except TypeError, e: +                #dirty method to filter out exceptions +                if "unexpected keyword argument 'thread'" not in e.args[0]: +                    raise + +                del self.kwargs["thread"] +                self.f(*self.args, **self.kwargs) +        finally: +            local = copy(self.active) +            for x in local: +                self.finishFile(x) + +            self.m.localThreads.remove(self) + + +class InfoThread(PluginThread): +    def __init__(self, manager, data, pid=-1, rid=-1, add=False): +        """Constructor""" +        PluginThread.__init__(self, manager) + +        self.data = data +        self.pid = pid # package id +        # [ .. (name, plugin) .. ] + +        self.rid = rid #result id +        self.add = add #add packages instead of return result + +        self.cache = [] #accumulated data + +        self.start() + +    def run(self): +        """run method""" + +        plugins = {} +        container = [] + +        for url, plugin in self.data: +            if plugin in plugins: +                plugins[plugin].append(url) +            else: +                plugins[plugin] = [url] + + +        # filter out container plugins +        for name in self.m.core.pluginManager.containerPlugins: +            if name in plugins: +                container.extend([(name, url) for url in plugins[name]]) + +                del plugins[name] + +        #directly write to database +        if self.pid > -1: +            for pluginname, urls in plugins.iteritems(): +                plugin = self.m.core.pluginManager.getPlugin(pluginname, True) +                if hasattr(plugin, "getInfo"): +                    self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) +                    self.m.core.files.save() + +        elif self.add: +            for pluginname, urls in plugins.iteritems(): +                plugin = self.m.core.pluginManager.getPlugin(pluginname, True) +                if hasattr(plugin, "getInfo"): +                    self.fetchForPlugin(pluginname, plugin, urls, self.updateCache, True) + +                else: +                    #generate default result +                    result = [(url, 0, 3, url) for url in urls] + +                    self.updateCache(pluginname, result) + +            packs = parseNames([(name, url) for name, x, y, url in self.cache]) + +            self.m.log.debug("Fetched and generated %d packages" % len(packs)) + +            for k, v in packs: +                self.m.core.api.addPackage(k, v) + +            #empty cache +            del self.cache[:] + +        else: #post the results + + +            for name, url in container: +                #attach container content +                try: +                    data = self.decryptContainer(name, url) +                except: +                    print_exc() +                    self.m.log.error("Could not decrypt container.") +                    data = [] + +                for url, plugin in data: +                    if plugin in plugins: +                        plugins[plugin].append(url) +                    else: +                        plugins[plugin] = [url] + +            self.m.infoResults[self.rid] = {} + +            for pluginname, urls in plugins.iteritems(): +                plugin = self.m.core.pluginManager.getPlugin(pluginname, True) +                if hasattr(plugin, "getInfo"): +                    self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) + +                    #force to process cache +                    if self.cache: +                        self.updateResult(pluginname, [], True) + +                else: +                    #generate default result +                    result = [(url, 0, 3, url) for url in urls] + +                    self.updateResult(pluginname, result, True) + +            self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {} + +        self.m.timestamp = time() + 5 * 60 + + +    def updateDB(self, plugin, result): +        self.m.core.files.updateFileInfo(result, self.pid) + +    def updateResult(self, plugin, result, force=False): +        #parse package name and generate result +        #accumulate results + +        self.cache.extend(result) + +        if len(self.cache) >= 20 or force: +            #used for package generating +            tmp = [(name, (url, OnlineStatus(name, plugin, "unknown", status, int(size)))) +            for name, size, status, url in self.cache] + +            data = parseNames(tmp) +            result = {} +            for k, v in data.iteritems(): +                for url, status in v: +                    status.packagename = k +                    result[url] = status + +            self.m.setInfoResults(self.rid, result) + +            self.cache = [] + +    def updateCache(self, plugin, result): +        self.cache.extend(result) + +    def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None): +        try: +            result = [] #result loaded from cache +            process = [] #urls to process +            for url in urls: +                if url in self.m.infoCache: +                    result.append(self.m.infoCache[url]) +                else: +                    process.append(url) + +            if result: +                self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname)) +                cb(pluginname, result) + +            if process: +                self.m.log.debug("Run Info Fetching for %s" % pluginname) +                for result in plugin.getInfo(process): +                    #result = [ .. (name, size, status, url) .. ] +                    if not type(result) == list: result = [result] + +                    for res in result: +                        self.m.infoCache[res[3]] = res + +                    cb(pluginname, result) + +            self.m.log.debug("Finished Info Fetching for %s" % pluginname) +        except Exception, e: +            self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % +                               {"name": pluginname, "err": str(e)}) +            if self.m.core.debug: +                print_exc() + +            # generate default results +            if err: +                result = [(url, 0, 3, url) for url in urls] +                cb(pluginname, result) + + +    def decryptContainer(self, plugin, url): +        data = [] +        # only works on container plugins + +        self.m.log.debug("Pre decrypting %s with %s" % (url, plugin)) + +        # dummy pyfile +        pyfile = PyFile(self.m.core.files, -1, url, url, 0, 0, "", plugin, -1, -1) + +        pyfile.initPlugin() + +        # little plugin lifecycle +        try: +            pyfile.plugin.setup() +            pyfile.plugin.loadToDisk() +            pyfile.plugin.decrypt(pyfile) +            pyfile.plugin.deleteTmp() + +            for pack in pyfile.plugin.packages: +                pyfile.plugin.urls.extend(pack[1]) + +            data = self.m.core.pluginManager.parseUrls(pyfile.plugin.urls) + +            self.m.log.debug("Got %d links." % len(data)) + +        except Exception, e: +            self.m.log.debug("Pre decrypting error: %s" % str(e)) +        finally: +            pyfile.release() + +        return data diff --git a/pyload/manager/thread/ServerThread.py b/pyload/manager/thread/ServerThread.py new file mode 100644 index 000000000..7de3b1ca1 --- /dev/null +++ b/pyload/manager/thread/ServerThread.py @@ -0,0 +1,108 @@ +from __future__ import with_statement +from os.path import exists + +import os +import threading +import logging + +core = None +setup = None +log = logging.getLogger("log") + +class WebServer(threading.Thread): +    def __init__(self, pycore): +        global core +        threading.Thread.__init__(self) +        self.core = pycore +        core = pycore +        self.running = True +        self.server = pycore.config['webinterface']['server'] +        self.https = pycore.config['webinterface']['https'] +        self.cert = pycore.config["ssl"]["cert"] +        self.key = pycore.config["ssl"]["key"] +        self.host = pycore.config['webinterface']['host'] +        self.port = pycore.config['webinterface']['port'] + +        self.setDaemon(True) + +    def run(self): +        import pyload.webui as webinterface +        global webinterface + +        reset = False + +        if self.https and (not exists(self.cert) or not exists(self.key)): +            log.warning(_("SSL certificates not found.")) +            self.https = False + +        if self.server in ("lighttpd", "nginx"): +            log.warning(_("Sorry, we dropped support for starting %s directly within pyLoad") % self.server) +            log.warning(_("You can use the threaded server which offers good performance and ssl,")) +            log.warning(_("of course you can still use your existing %s with pyLoads fastcgi server") % self.server) +            log.warning(_("sample configs are located in the pyload/web/servers directory")) +            reset = True +        elif self.server == "fastcgi": +            try: +                import flup +            except: +                log.warning(_("Can't use %(server)s, python-flup is not installed!") % { +                    "server": self.server}) +                reset = True + +        if reset or self.server == "lightweight": +            if os.name != "nt": +                try: +                    import bjoern +                except Exception, e: +                    log.error(_("Error importing lightweight server: %s") % e) +                    log.warning(_("You need to download and compile bjoern, https://github.com/jonashaag/bjoern")) +                    log.warning(_("Copy the boern.so to pyload/lib folder or use setup.py install")) +                    log.warning(_("Of course you need to be familiar with linux and know how to compile software")) +                    self.server = "builtin" +            else: +                self.core.log.info(_("Server set to threaded, due to known performance problems on windows.")) +                self.core.config['webinterface']['server'] = "threaded" +                self.server = "threaded" + +        if self.server == "threaded": +            self.start_threaded() +        elif self.server == "fastcgi": +            self.start_fcgi() +        elif self.server == "lightweight": +            self.start_lightweight() +        else: +            self.start_builtin() + +    def start_builtin(self): + +        if self.https: +            log.warning(_("This server offers no SSL, please consider using threaded instead")) + +        self.core.log.info(_("Starting builtin webserver: %(host)s:%(port)d") % {"host": self.host, "port": self.port}) +        webinterface.run_simple(host=self.host, port=self.port) + +    def start_threaded(self): +        if self.https: +            self.core.log.info(_("Starting threaded SSL webserver: %(host)s:%(port)d") % {"host": self.host, "port": self.port}) +        else: +            self.cert = "" +            self.key = "" +            self.core.log.info(_("Starting threaded webserver: %(host)s:%(port)d") % {"host": self.host, "port": self.port}) + +        webinterface.run_threaded(host=self.host, port=self.port, cert=self.cert, key=self.key) + +    def start_fcgi(self): + +        self.core.log.info(_("Starting fastcgi server: %(host)s:%(port)d") % {"host": self.host, "port": self.port}) +        webinterface.run_fcgi(host=self.host, port=self.port) + + +    def start_lightweight(self): +        if self.https: +            log.warning(_("This server offers no SSL, please consider using threaded instead")) + +        self.core.log.info(_("Starting lightweight webserver (bjoern): %(host)s:%(port)d") % {"host": self.host, "port": self.port}) +        webinterface.run_lightweight(host=self.host, port=self.port) + +    def quit(self): +        self.running = False diff --git a/pyload/manager/thread/__init__.py b/pyload/manager/thread/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pyload/manager/thread/__init__.py | 
