diff options
Diffstat (limited to 'module')
| -rw-r--r-- | module/FileDatabase.py | 1579 | ||||
| -rw-r--r-- | module/PullEvents.py | 10 | 
2 files changed, 810 insertions, 779 deletions
| diff --git a/module/FileDatabase.py b/module/FileDatabase.py index 7e8d043d2..dde151cb3 100644 --- a/module/FileDatabase.py +++ b/module/FileDatabase.py @@ -14,6 +14,7 @@      along with this program; if not, see <http://www.gnu.org/licenses/>.      @author: RaNaN +    @author: mkaay  """  from Queue import Queue  import sqlite3 @@ -23,6 +24,8 @@ from time import sleep  from time import time  import traceback +from module.PullEvents import UpdateEvent, RemoveEvent, InsertEvent +  statusMap = {      "finished":    0,      "offline":     1, @@ -41,814 +44,842 @@ statusMap = {  }  def formatSize(size): -	"""formats size of bytes""" -	size = int(size) -	steps = 0 -	sizes = ["B", "KB", "MB", "GB" , "TB"] -	 -	while size > 1000: -		size /= 1024.0 -		steps += 1 -	 -	return "%.2f %s" % (size, sizes[steps]) +    """formats size of bytes""" +    size = int(size) +    steps = 0 +    sizes = ["B", "KB", "MB", "GB" , "TB"] +     +    while size > 1000: +        size /= 1024.0 +        steps += 1 +     +    return "%.2f %s" % (size, sizes[steps]) +  ########################################################################  class FileHandler: -	"""Handles all request made to obtain information,  -	modify status or other request for links or packages""" - -	 -	#---------------------------------------------------------------------- -	def __init__(self, core): -		"""Constructor""" -		self.core = core - -		# translations -		self.statusMsg = [_("finished"), _("offline"), _("online"), _("queued"), _("checking"), _("waiting"), _("reconnected"), _("starting"),_("failed"), _("aborted"), _("decrypting"), _("custom"),_("downloading"), _("processing")] -		 -		self.cache = {} #holds instances for files -		self.packageCache = {}  # same for packages -		#@TODO: purge the cache - -		self.jobCache = {} -		 -		self.lock = RLock() -		 -		self.filecount = -1 # if an invalid value is set get current value from db		 -		self.unchanged = False #determines if any changes was made since last call - -		self.db = FileDatabaseBackend(self) # the backend - - -	def change(func): -		def new(*args): -			args[0].unchanged = False -			args[0].filecount = -1 -			args[0].jobCache = {} -			return func(*args) -		return new -	 -	#---------------------------------------------------------------------- -	def save(self): -		"""saves all data to backend""" -		self.db.commit() - -	#---------------------------------------------------------------------- -	def syncSave(self): -		"""saves all data to backend and waits until all data are written""" -		self.db.syncSave() -		 -	#---------------------------------------------------------------------- -	def getCompleteData(self, queue=1): -		"""gets a complete data representation""" - -		data = self.db.getAllLinks(queue) -		packs = self.db.getAllPackages(queue) -		 -		data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) -		packs.update( [ (str(x.id), x.toDict()[x.id]) for x in self.packageCache.itervalues() if x.queue == queue] ) - -		for key, value in data.iteritems(): -			if packs.has_key(str(value["package"])): -				packs[str(value["package"])]["links"][key] = value - -		return packs - -	#---------------------------------------------------------------------- -	@change -	def addLinks(self, urls, package): -		"""adds links""" -	 -		# tuple of (url, name, plugin, package) -		links = [ (x[0], x[0], x[1], package) for x in self.core.pluginManager.parseUrls(urls) ] -		 -		self.db.addLinks(links, package) -		 - -	#---------------------------------------------------------------------- -	@change -	def addPackage(self, name, folder, queue=0): -		"""adds a package, default to link collector""" -		return self.db.addPackage(name, folder, queue) - -	#---------------------------------------------------------------------- -	@change -	def deletePackage(self, id): -		"""delete package and all contained links""" -		 -		self.lock.acquire() - -		if self.packageCache.has_key(id): -			del self.packageCache[id] - -		pyfiles = self.cache.values() -		 -		for pyfile in pyfiles: -			if pyfile.packageid == id: -				pyfile.abortDownload() -				pyfile.release() -		 - -		self.db.deletePackage(id) -		 -		 -		self.lock.release() - -	#---------------------------------------------------------------------- -	@change -	def deleteLink(self, id): -		"""deletes links""" -		 -		self.lock.acquire() -		 -		if self.cache.has_key(id): -			if id in self.core.threadManager.processingIds(): -				self.cache[id].abortDownload() -	 -			 -		self.lock.release() -		 -		self.db.deleteLink(id) - -	#---------------------------------------------------------------------- -	def releaseLink(self, id): -		"""removes pyfile from cache""" -		if self.cache.has_key(id): -			del self.cache[id] - -	#---------------------------------------------------------------------- -	def releasePackage(self, id): -		"""removes package from cache""" -		if self.packageCache.has_key(id): -			del self.packageCache[id] - -	#---------------------------------------------------------------------- -	def updateLink(self, pyfile): -		"""updates link""" -		self.db.updateLink(pyfile) - -	#---------------------------------------------------------------------- -	def updatePackage(self, pypack): -		"""updates a package""" -		self.db.updatePackage(pypack) - -	#---------------------------------------------------------------------- -	def getPackage(self, id): -		"""return package instance""" -		 -		if self.packageCache.has_key(id): -			return self.packageCache[id] -		else: -			return self.db.getPackage(id) -	 -	#---------------------------------------------------------------------- -	def getPackageData(self, id): -		"""returns dict with package information""" -		pack = self.getPackage(id) -		pack = pack.toDict()[id] -		 -		data = self.db.getPackageData(id) -		 -		data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) -		 -		pack["links"] = data -		 -		return pack -	 -	#---------------------------------------------------------------------- -	def getFile(self, id): -		"""returns pyfile instance""" -		if self.cache.has_key(id): -			return self.cache[id] -		else: -			return self.db.getFile(id) - -	#---------------------------------------------------------------------- -	def getJob(self, occ): -		"""get suitable job""" -		 -		self.lock.acquire() -		 -		#@TODO clean mess -		 -		if self.jobCache.has_key(occ): -			if self.jobCache[occ]: -				id = self.jobCache[occ].pop() -				if id == "empty": -					pyfile = None -				else: -					pyfile = self.getFile(id) -			else: -				jobs = self.db.getJob(occ) -				jobs.reverse() -				if not jobs: -					self.jobCache[occ].append("empty") -					pyfile = None -				else: -					self.jobCache[occ].extend(jobs) -					pyfile = self.getFile(self.jobCache[occ].pop()) -				 -		else: -			self.jobCache = {} #better not caching to much -			jobs = self.db.getJob(occ) -			jobs.reverse() -			self.jobCache[occ] = jobs -			 -			if not jobs: -				self.jobCache[occ].append("empty") -				pyfile = None -		 -			pyfile = self.getFile(self.jobCache[occ].pop()) -			#@TODO: maybe the new job has to be approved... -					 -		 -		#pyfile = self.getFile(self.jobCache[occ].pop()) -		 -		self.lock.release() -		return pyfile -	 -	 -	#---------------------------------------------------------------------- -	def getFileCount(self): -		"""returns number of files""" - -		if self.filecount == -1: -			self.filecount = self.db.filecount(1) -		 -		return self.filecount -	 -	#---------------------------------------------------------------------- -	def getQueueCount(self): -		"""number of files that have to be processed""" -		pass -	 -	#---------------------------------------------------------------------- -	def restartPackage(self, id): -		"""restart package""" -		for pyfile in self.cache.itervalues(): -			if pyfile.packageid == id: -				self.restartFile(pyfile.id) -		 -		self.db.restartPackage(id) -	 -	def restartFile(self, id): -		""" restart file""" -		if self.cache.has_key(id): -			self.cache[id].abortDownload() -			self.cache[id].status = 3 -			self.cache[id].name = self.cache[id].url -			self.cache[id].sync() -		else: -			self.db.restartFile(id) -		 +    """Handles all request made to obtain information,  +    modify status or other request for links or packages""" + +     +    #---------------------------------------------------------------------- +    def __init__(self, core): +        """Constructor""" +        self.core = core + +        # translations +        self.statusMsg = [_("finished"), _("offline"), _("online"), _("queued"), _("checking"), _("waiting"), _("reconnected"), _("starting"),_("failed"), _("aborted"), _("decrypting"), _("custom"),_("downloading"), _("processing")] +         +        self.cache = {} #holds instances for files +        self.packageCache = {}  # same for packages +        #@TODO: purge the cache + +        self.jobCache = {} +         +        self.lock = RLock() +         +        self.filecount = -1 # if an invalid value is set get current value from db         +        self.unchanged = False #determines if any changes was made since last call + +        self.db = FileDatabaseBackend(self) # the backend + + +    def change(func): +        def new(*args): +            args[0].unchanged = False +            args[0].filecount = -1 +            args[0].jobCache = {} +            return func(*args) +        return new +     +    #---------------------------------------------------------------------- +    def save(self): +        """saves all data to backend""" +        self.db.commit() + +    #---------------------------------------------------------------------- +    def syncSave(self): +        """saves all data to backend and waits until all data are written""" +        self.db.syncSave() +         +    #---------------------------------------------------------------------- +    def getCompleteData(self, queue=1): +        """gets a complete data representation""" + +        data = self.db.getAllLinks(queue) +        packs = self.db.getAllPackages(queue) +         +        data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) +        packs.update( [ (str(x.id), x.toDict()[x.id]) for x in self.packageCache.itervalues() if x.queue == queue] ) + +        for key, value in data.iteritems(): +            if packs.has_key(str(value["package"])): +                packs[str(value["package"])]["links"][key] = value + +        return packs + +    #---------------------------------------------------------------------- +    @change +    def addLinks(self, urls, package): +        """adds links""" +     +        for x in self.core.pluginManager.parseUrls(urls): +            # tuple of (url, name, plugin, package) +            lastID = self.db.addLink(x[0], x[0], x[1], package) +            e = InsertEvent("file", lastID, -1, "collector" if not self.getPackage(package).queue else "queue") +            self.core.pullManager.addEvent(e) +         + +    #---------------------------------------------------------------------- +    @change +    def addPackage(self, name, folder, queue=0): +        """adds a package, default to link collector""" +        lastID = self.db.addPackage(name, folder, queue) +        e = InsertEvent("pack", lastID, -1, "collector" if not queue else "queue") +        self.core.pullManager.addEvent(e) +        return lastID + +    #---------------------------------------------------------------------- +    @change +    def deletePackage(self, id): +        """delete package and all contained links""" +         +        self.lock.acquire() +         +        if self.packageCache.has_key(id): +            del self.packageCache[id] + +        pyfiles = self.cache.values() +         +        for pyfile in pyfiles: +            if pyfile.packageid == id: +                pyfile.abortDownload() +                pyfile.release() + +        self.db.deletePackage(id) +         +        e = RemoveEvent("pack", id, "collector" if not self.getPackage(id).queue else "queue") +        self.core.pullManager.addEvent(e) +         +        self.lock.release() + +    #---------------------------------------------------------------------- +    @change +    def deleteLink(self, id): +        """deletes links""" +         +        self.lock.acquire() +         +        e = RemoveEvent("file", id, "collector" if not self.getFile(id).package().queue else "queue") +         +        if self.cache.has_key(id): +            if id in self.core.threadManager.processingIds(): +                self.cache[id].abortDownload() +             +        self.lock.release() +         +        self.db.deleteLink(id) +         +        self.core.pullManager.addEvent(e) + +    #---------------------------------------------------------------------- +    def releaseLink(self, id): +        """removes pyfile from cache""" +        if self.cache.has_key(id): +            del self.cache[id] + +    #---------------------------------------------------------------------- +    def releasePackage(self, id): +        """removes package from cache""" +        if self.packageCache.has_key(id): +            del self.packageCache[id] + +    #---------------------------------------------------------------------- +    def updateLink(self, pyfile): +        """updates link""" +        self.db.updateLink(pyfile) +         +        e = UpdateEvent("file", pyfile.id, "collector" if not pyfile.package().queue else "queue") +        self.core.pullManager.addEvent(e) + +    #---------------------------------------------------------------------- +    def updatePackage(self, pypack): +        """updates a package""" +        self.db.updatePackage(pypack) +         +        e = UpdateEvent("pack", pypack.id, "collector" if not pypack.queue else "queue") +        self.core.pullManager.addEvent(e) + +    #---------------------------------------------------------------------- +    def getPackage(self, id): +        """return package instance""" +         +        if self.packageCache.has_key(id): +            return self.packageCache[id] +        else: +            return self.db.getPackage(id) +     +    #---------------------------------------------------------------------- +    def getPackageData(self, id): +        """returns dict with package information""" +        pack = self.getPackage(id) +        pack = pack.toDict()[id] +         +        data = self.db.getPackageData(id) +         +        data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) +         +        pack["links"] = data +         +        return pack +     +    #---------------------------------------------------------------------- +    def getFileData(self, id): +        """returns dict with package information""" +        pyfile = self.getFile(id) +         +        return pyfile.toDbDict() +     +    #---------------------------------------------------------------------- +    def getFile(self, id): +        """returns pyfile instance""" +        if self.cache.has_key(id): +            return self.cache[id] +        else: +            return self.db.getFile(id) + +    #---------------------------------------------------------------------- +    def getJob(self, occ): +        """get suitable job""" +         +        self.lock.acquire() +         +        #@TODO clean mess +         +        if self.jobCache.has_key(occ): +            if self.jobCache[occ]: +                id = self.jobCache[occ].pop() +                if id == "empty": +                    pyfile = None +                else: +                    pyfile = self.getFile(id) +            else: +                jobs = self.db.getJob(occ) +                jobs.reverse() +                if not jobs: +                    self.jobCache[occ].append("empty") +                    pyfile = None +                else: +                    self.jobCache[occ].extend(jobs) +                    pyfile = self.getFile(self.jobCache[occ].pop()) +                 +        else: +            self.jobCache = {} #better not caching to much +            jobs = self.db.getJob(occ) +            jobs.reverse() +            self.jobCache[occ] = jobs +             +            if not jobs: +                self.jobCache[occ].append("empty") +                pyfile = None +         +            pyfile = self.getFile(self.jobCache[occ].pop()) +            #@TODO: maybe the new job has to be approved... +                     +         +        #pyfile = self.getFile(self.jobCache[occ].pop()) +         +        self.lock.release() +        return pyfile +     +     +    #---------------------------------------------------------------------- +    def getFileCount(self): +        """returns number of files""" + +        if self.filecount == -1: +            self.filecount = self.db.filecount(1) +         +        return self.filecount +     +    #---------------------------------------------------------------------- +    def getQueueCount(self): +        """number of files that have to be processed""" +        pass +     +    #---------------------------------------------------------------------- +    def restartPackage(self, id): +        """restart package""" +        for pyfile in self.cache.itervalues(): +            if pyfile.packageid == id: +                self.restartFile(pyfile.id) +         +        self.db.restartPackage(id) +         +        e = UpdateEvent("pack", id, "collector" if not self.getPackage(id).queue else "queue") +        self.core.pullManager.addEvent(e) +     +    def restartFile(self, id): +        """ restart file""" +        if self.cache.has_key(id): +            self.cache[id].abortDownload() +            self.cache[id].status = 3 +            self.cache[id].name = self.cache[id].url +            self.cache[id].sync() +        else: +            self.db.restartFile(id) +         +        e = UpdateEvent("file", id, "collector" if not self.getFile(id).package().queue else "queue") +        self.core.pullManager.addEvent(e) +          #########################################################################  class FileDatabaseBackend(Thread): -	"""underlying backend for the filehandler to save the data""" - -	def __init__(self, manager): -		Thread.__init__(self) - -		self.setDaemon(True) +    """underlying backend for the filehandler to save the data""" + +    def __init__(self, manager): +        Thread.__init__(self) + +        self.setDaemon(True) -		self.manager = manager - -		self.jobs = Queue() # queues for jobs -		self.res = Queue() - -		self.start() - - -	def queue(func): -		"""use as decorator when fuction directly executes sql commands""" -		def new(*args): -			args[0].jobs.put((func, args, 0)) -			return args[0].res.get() -		return new - -	def async(func): -		"""use as decorator when function does not return anything and asynchron execution is wanted""" -		def new(*args): -			args[0].jobs.put((func, args, 1)) -			return True -		return new - -	def run(self): -		"""main loop, which executes commands""" - -		self.conn = sqlite3.connect("files.db") -		self.c = self.conn.cursor() -		#self.c.execute("PRAGMA synchronous = OFF") -		self._createTables() - -		while True: -			try: -				f, args, async = self.jobs.get() -				if f == "quit": return True -				res = f(*args) -				if not async: self.res.put(res) -			except Exception, e: -				#@TODO log etc -				print "Database Error @", f.__name__, args[1:], e -				traceback.print_exc() -				if not async: self.res.put(None) - -	def shutdown(self): -		self.save() -		self.jobs.put(("quit", "", 0)) - -	def _createTables(self): -		"""create tables for database""" - -		self.c.execute('CREATE TABLE IF NOT EXISTS "packages" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" TEXT NOT NULL, "folder" TEXT, "password" TEXT, "site" TEXT, "queue" INTEGER DEFAULT 0 NOT NULL)') -		self.c.execute('CREATE TABLE IF NOT EXISTS "links" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "url" TEXT NOT NULL, "name" TEXT, "size" INTEGER DEFAULT 0 NOT NULL, "status" INTEGER DEFAULT 3 NOT NULL, "plugin" TEXT DEFAULT "BasePlugin" NOT NULL, "error" TEXT DEFAULT "", "package" INTEGER DEFAULT 0 NOT NULL, FOREIGN KEY(package) REFERENCES packages(id))') -		self.c.execute('CREATE INDEX IF NOT EXISTS "pIdIndex" ON links(package)') -		self.c.execute('VACUUM') -		 -	#---------------------------------------------------------------------- -	@queue -	def filecount(self, queue): -		"""returns number of files in queue""" -		self.c.execute("SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id", (queue,)) -		r = self.c.fetchall() -		return len(r) -		 -	@queue -	def addLink(self, url, name, plugin, package): -		self.c.execute('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', (url, name, plugin, package)) -		return self.c.lastrowid - -	@queue -	def addLinks(self, links, package): -		""" links is a list of tupels (url,name,plugin)""" -		self.c.executemany('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', links) - -	@queue -	def addPackage(self, name, folder, queue): - -		self.c.execute('INSERT INTO packages(name, folder, queue) VALUES(?,?,?)', (name, folder, queue)) -		return self.c.lastrowid - -	@queue -	def deletePackage(self, id): - -		self.c.execute('DELETE FROM links WHERE package=?', (str(id), )) -		self.c.execute('DELETE FROM packages WHERE id=?', (str(id), )) - -	@queue -	def deleteLink(self, id): - -		self.c.execute('DELETE FROM links WHERE id=?', (str(id), )) - - -	@queue -	def getAllLinks(self, q): -		"""return information about all links in queue q - -		q0 queue -		q1 collector - -		format: - -		{ -		    id: {'name': name, ... 'package': id }, ... -		} - -		""" -		self.c.execute('SELECT l.id,l.url,l.name,l.size,l.status,l.error,l.plugin,l.package FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id', (q, )) -		data = {} -		for r in self.c: -			data[str(r[0])] = { -				'url': r[1], -				'name': r[2], -				'size': r[3], -			    'format_size': formatSize(r[3]), -				'status': r[4], -			    'statusmsg': self.manager.statusMsg[r[4]], -				'error': r[5], -				'plugin': r[6], -				'package': r[7] -			} - -		return data - -	@queue -	def getAllPackages(self, q): -		"""return information about packages in queue q -		(only useful in get all data) - -		q0 queue -		q1 collector - -		format: - -		{ -		    id: {'name': name ... 'links': {} }, ... -		} -		""" -		self.c.execute('SELECT id,name,folder,site,password,queue FROM packages WHERE queue=? ORDER BY id', str(q)) - -		data = {} -		for r in self.c: -			data[str(r[0])] = { -				'name': r[1], -				'folder': r[2], -				'site': r[3], -				'password': r[4], -				'queue': r[5], -				'links': {} -			} - -		return data - - -	def getLinkData(self, id): -		"""get link information""" -		pass - -	@queue -	def getPackageData(self, id): -		"""get package data""" -		self.c.execute('SELECT id,url,name,size,status,error,plugin,package FROM links WHERE package=? ORDER BY id', (str(id),)) - -		data = {} -		for r in self.c: -			data[str(r[0])] = { -				'url': r[1], -				'name': r[2], -				'size': r[3], -			    'format_size': formatSize(r[3]), -				'status': r[4], -			    'statusmsg': self.manager.statusMsg[r[4]], -				'error': r[5], -				'plugin': r[6], -				'package': r[7] -			} - -		return data - - -	@async -	def updateLink(self, f): -		self.c.execute('UPDATE links SET url=?,name=?,size=?,status=?,error=?,package=? WHERE id=?', (f.url, f.name, f.size, f.status, f.error, str(f.packageid), str(f.id))) - -	@async -	def updatePackage(self, p): -		self.c.execute('UPDATE packages SET name=?,folder=?,site=?,password=?,queue=? WHERE id=?', (p.name, p.folder, p.site, p.password, p.queue, str(p.id))) - -	@async -	def restartFile(self, id): -		self.c.execute('UPDATE links SET status=3 WHERE id=?', ( str(id), ) ) - -	@async -	def restartPackage(self, id): -		self.c.execute('UPDATE links SET status=3 WHERE package=?', ( str(id), ) ) -		 -	@async -	def commit(self): -		self.conn.commit() -		 -	@queue -	def syncSave(self): -		self.conn.commit() - -	@queue -	def getPackage(self, id): -		"""return package instance from id""" -		self.c.execute("SELECT name,folder,site,password,queue FROM packages WHERE id=?", (str(id),)) -		r = self.c.fetchone() -		if not r: return None -		return PyPackage(self.manager, id, *r) - -	#---------------------------------------------------------------------- -	@queue -	def getFile(self, id): -		"""return link instance from id""" -		self.c.execute("SELECT url, name, size, status, error, plugin, package FROM links WHERE id=?", (str(id),)) -		r = self.c.fetchone() -		if not r: return None -		return PyFile(self.manager, id, *r) - - -	@queue -	def getJob(self, occ): -		"""return pyfile instance, which is suitable for download and dont use a occupied plugin""" -		 -		cmd = "(" -		i = 0 -		for item in occ: -			if i != 0: cmd += ", " -			cmd += "'%s'" % item -		 -		cmd += ")" -		 -		cmd = "SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=1 AND l.plugin NOT IN %s AND l.status IN (2,3,6) LIMIT 5" % cmd -			 -		self.c.execute(cmd) # very bad! - -		return [x[0] for x in self.c ] +        self.manager = manager + +        self.jobs = Queue() # queues for jobs +        self.res = Queue() + +        self.start() + + +    def queue(func): +        """use as decorator when fuction directly executes sql commands""" +        def new(*args): +            args[0].jobs.put((func, args, 0)) +            return args[0].res.get() +        return new + +    def async(func): +        """use as decorator when function does not return anything and asynchron execution is wanted""" +        def new(*args): +            args[0].jobs.put((func, args, 1)) +            return True +        return new + +    def run(self): +        """main loop, which executes commands""" + +        self.conn = sqlite3.connect("files.db") +        self.c = self.conn.cursor() +        #self.c.execute("PRAGMA synchronous = OFF") +        self._createTables() + +        while True: +            try: +                f, args, async = self.jobs.get() +                if f == "quit": return True +                res = f(*args) +                if not async: self.res.put(res) +            except Exception, e: +                #@TODO log etc +                print "Database Error @", f.__name__, args[1:], e +                traceback.print_exc() +                if not async: self.res.put(None) + +    def shutdown(self): +        self.save() +        self.jobs.put(("quit", "", 0)) + +    def _createTables(self): +        """create tables for database""" + +        self.c.execute('CREATE TABLE IF NOT EXISTS "packages" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" TEXT NOT NULL, "folder" TEXT, "password" TEXT, "site" TEXT, "queue" INTEGER DEFAULT 0 NOT NULL)') +        self.c.execute('CREATE TABLE IF NOT EXISTS "links" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "url" TEXT NOT NULL, "name" TEXT, "size" INTEGER DEFAULT 0 NOT NULL, "status" INTEGER DEFAULT 3 NOT NULL, "plugin" TEXT DEFAULT "BasePlugin" NOT NULL, "error" TEXT DEFAULT "", "package" INTEGER DEFAULT 0 NOT NULL, FOREIGN KEY(package) REFERENCES packages(id))') +        self.c.execute('CREATE INDEX IF NOT EXISTS "pIdIndex" ON links(package)') +        self.c.execute('VACUUM') +         +    #---------------------------------------------------------------------- +    @queue +    def filecount(self, queue): +        """returns number of files in queue""" +        self.c.execute("SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id", (queue,)) +        r = self.c.fetchall() +        return len(r) +         +    @queue +    def addLink(self, url, name, plugin, package): +        self.c.execute('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', (url, name, plugin, package)) +        return self.c.lastrowid + +    @queue +    def addLinks(self, links, package): +        """ links is a list of tupels (url,name,plugin)""" +        self.c.executemany('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', links) + +    @queue +    def addPackage(self, name, folder, queue): + +        self.c.execute('INSERT INTO packages(name, folder, queue) VALUES(?,?,?)', (name, folder, queue)) +        return self.c.lastrowid + +    @queue +    def deletePackage(self, id): + +        self.c.execute('DELETE FROM links WHERE package=?', (str(id), )) +        self.c.execute('DELETE FROM packages WHERE id=?', (str(id), )) + +    @queue +    def deleteLink(self, id): + +        self.c.execute('DELETE FROM links WHERE id=?', (str(id), )) + + +    @queue +    def getAllLinks(self, q): +        """return information about all links in queue q + +        q0 queue +        q1 collector + +        format: + +        { +            id: {'name': name, ... 'package': id }, ... +        } + +        """ +        self.c.execute('SELECT l.id,l.url,l.name,l.size,l.status,l.error,l.plugin,l.package FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id', (q, )) +        data = {} +        for r in self.c: +            data[str(r[0])] = { +                'url': r[1], +                'name': r[2], +                'size': r[3], +                'format_size': formatSize(r[3]), +                'status': r[4], +                'statusmsg': self.manager.statusMsg[r[4]], +                'error': r[5], +                'plugin': r[6], +                'package': r[7] +            } + +        return data + +    @queue +    def getAllPackages(self, q): +        """return information about packages in queue q +        (only useful in get all data) + +        q0 queue +        q1 collector + +        format: + +        { +            id: {'name': name ... 'links': {} }, ... +        } +        """ +        self.c.execute('SELECT id,name,folder,site,password,queue FROM packages WHERE queue=? ORDER BY id', str(q)) + +        data = {} +        for r in self.c: +            data[str(r[0])] = { +                'name': r[1], +                'folder': r[2], +                'site': r[3], +                'password': r[4], +                'queue': r[5], +                'links': {} +            } + +        return data + + +    def getLinkData(self, id): +        """get link information""" +        pass + +    @queue +    def getPackageData(self, id): +        """get package data""" +        self.c.execute('SELECT id,url,name,size,status,error,plugin,package FROM links WHERE package=? ORDER BY id', (str(id),)) + +        data = {} +        for r in self.c: +            data[str(r[0])] = { +                'url': r[1], +                'name': r[2], +                'size': r[3], +                'format_size': formatSize(r[3]), +                'status': r[4], +                'statusmsg': self.manager.statusMsg[r[4]], +                'error': r[5], +                'plugin': r[6], +                'package': r[7] +            } + +        return data + + +    @async +    def updateLink(self, f): +        self.c.execute('UPDATE links SET url=?,name=?,size=?,status=?,error=?,package=? WHERE id=?', (f.url, f.name, f.size, f.status, f.error, str(f.packageid), str(f.id))) + +    @async +    def updatePackage(self, p): +        self.c.execute('UPDATE packages SET name=?,folder=?,site=?,password=?,queue=? WHERE id=?', (p.name, p.folder, p.site, p.password, p.queue, str(p.id))) + +    @async +    def restartFile(self, id): +        self.c.execute('UPDATE links SET status=3 WHERE id=?', ( str(id), ) ) + +    @async +    def restartPackage(self, id): +        self.c.execute('UPDATE links SET status=3 WHERE package=?', ( str(id), ) ) +         +    @async +    def commit(self): +        self.conn.commit() +         +    @queue +    def syncSave(self): +        self.conn.commit() + +    @queue +    def getPackage(self, id): +        """return package instance from id""" +        self.c.execute("SELECT name,folder,site,password,queue FROM packages WHERE id=?", (str(id),)) +        r = self.c.fetchone() +        if not r: return None +        return PyPackage(self.manager, id, *r) + +    #---------------------------------------------------------------------- +    @queue +    def getFile(self, id): +        """return link instance from id""" +        self.c.execute("SELECT url, name, size, status, error, plugin, package FROM links WHERE id=?", (str(id),)) +        r = self.c.fetchone() +        if not r: return None +        return PyFile(self.manager, id, *r) + + +    @queue +    def getJob(self, occ): +        """return pyfile instance, which is suitable for download and dont use a occupied plugin""" +         +        cmd = "(" +        i = 0 +        for item in occ: +            if i != 0: cmd += ", " +            cmd += "'%s'" % item +         +        cmd += ")" +         +        cmd = "SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=1 AND l.plugin NOT IN %s AND l.status IN (2,3,6) LIMIT 5" % cmd +             +        self.c.execute(cmd) # very bad! + +        return [x[0] for x in self.c ]  class PyFile(): -	def __init__(self, manager, id, url, name, size, status, error, pluginname, package): -		self.m = manager -		self.m.cache[int(id)] = self -		 -		self.id = int(id) -		self.url = url -		self.name = name -		self.size = size -		self.status = status -		self.pluginname = pluginname -		self.packageid = package #should not be used, use package() instead -		self.error = error -		# database information ends here -		 -		self.plugin = None -			 -		self.waitUntil = 0 # time() + time to wait -		 -		# status attributes -		self.active = False #obsolete? -		self.abort = False -		self.reconnected = False - -		 -	def __repr__(self): -		return "PyFile %s: %s@%s" % (self.id, self.name, self.pluginname) -		 -	def initPlugin(self): -		""" inits plugin instance """ -		self.pluginmodule = self.m.core.pluginManager.getPlugin(self.pluginname) -		self.pluginclass = getattr(self.pluginmodule, self.pluginname) -		self.plugin = self.pluginclass(self) -	 -	 -	def package(self): -		""" return package instance""" -		return self.m.getPackage(self.packageid) - -	def setStatus(self, status): -		self.status = statusMap[status] -		self.sync() #@TODO needed aslong no better job approving exists - -	def hasStatus(self, status): -		return statusMap[status] == self.status -	 -	def sync(self): -		"""sync PyFile instance with database""" -		self.m.updateLink(self) - -	def release(self): -		"""sync and remove from cache""" -		self.sync() -		self.m.releaseLink(self.id) - -	def delete(self): -		"""delete pyfile from database""" -		self.m.deleteLink(self.id) - -	def toDict(self): -		"""return dict with all information for interface""" -		return self.toDbDict() - -	def toDbDict(self): -		"""return data as dict for databse - -		format: - -		{ -		    id: {'url': url, 'name': name ... } -		} - -		""" -		return { -			self.id: { -				'url': self.url, -				'name': self.name, -		        'plugin' : self.pluginname, -				'size': self.getSize(), -		        'format_size': self.formatSize(), -				'status': self.status, -		        'statusmsg': self.m.statusMsg[self.status], -				'package': self.packageid, -				'error': self.error -			} -		} -	 -	def abortDownload(self): -		"""abort pyfile if possible""" -		while self.id in self.m.core.threadManager.processingIds(): -			self.abort = True -			if self.plugin: self.plugin.req.abort = True -			sleep(0.1) -		 -		abort = False  -		self.plugin.req.abort = False -		 -	def finishIfDone(self): -		"""set status to finish and release file if every thread is finished with it""" -		 -		if self.id in self.m.core.threadManager.processingIds(): -			return False -		 -		self.setStatus("finished") -		self.release() -		return True -	 -	def formatWait(self): -		""" formats and return wait time in humanreadable format """ -		seconds = self.waitUntil - time() -		 -		if seconds < 0 : return "00:00:00" -				 -		hours, seconds = divmod(seconds, 3600) -		minutes, seconds = divmod(seconds, 60) -		return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) -	 -	def formatSize(self): -		""" formats size to readable format """ -		return formatSize(self.getSize()) - -	def formatETA(self): -		""" formats eta to readable format """ -		seconds = self.getETA() -		 -		if seconds < 0 : return "00:00:00" -				 -		hours, seconds = divmod(seconds, 3600) -		minutes, seconds = divmod(seconds, 60) -		return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) -		 -	def getSpeed(self): -		""" calculates speed """ -		try: -			return self.plugin.req.get_speed() -		except: -			return 0 -		 -	def getETA(self): -		""" gets established time of arrival""" -		try: -			return self.plugin.req.get_ETA() -		except: -			return 0 -	 -	def getBytesLeft(self): -		""" gets bytes left """ -		try: -			return self.plugin.req.bytes_left() -		except: -			return 0 -	 -	def getPercent(self): -		""" get % of download """ -		try: -			return int((float(self.plugin.req.dl_arrived)  / self.plugin.req.dl_size) * 100) -		except: -			return 0 -		 -	def getSize(self): -		""" get size of download """ -		if self.size: return self.size -		else: -			try: -				return self.plugin.req.dl_size -			except: -				return 0 +    def __init__(self, manager, id, url, name, size, status, error, pluginname, package): +        self.m = manager +        self.m.cache[int(id)] = self +         +        self.id = int(id) +        self.url = url +        self.name = name +        self.size = size +        self.status = status +        self.pluginname = pluginname +        self.packageid = package #should not be used, use package() instead +        self.error = error +        # database information ends here +         +        self.plugin = None +             +        self.waitUntil = 0 # time() + time to wait +         +        # status attributes +        self.active = False #obsolete? +        self.abort = False +        self.reconnected = False + +         +    def __repr__(self): +        return "PyFile %s: %s@%s" % (self.id, self.name, self.pluginname) +         +    def initPlugin(self): +        """ inits plugin instance """ +        self.pluginmodule = self.m.core.pluginManager.getPlugin(self.pluginname) +        self.pluginclass = getattr(self.pluginmodule, self.pluginname) +        self.plugin = self.pluginclass(self) +     +     +    def package(self): +        """ return package instance""" +        return self.m.getPackage(self.packageid) + +    def setStatus(self, status): +        self.status = statusMap[status] +        self.sync() #@TODO needed aslong no better job approving exists + +    def hasStatus(self, status): +        return statusMap[status] == self.status +     +    def sync(self): +        """sync PyFile instance with database""" +        self.m.updateLink(self) + +    def release(self): +        """sync and remove from cache""" +        self.sync() +        self.m.releaseLink(self.id) + +    def delete(self): +        """delete pyfile from database""" +        self.m.deleteLink(self.id) + +    def toDict(self): +        """return dict with all information for interface""" +        return self.toDbDict() + +    def toDbDict(self): +        """return data as dict for databse + +        format: + +        { +            id: {'url': url, 'name': name ... } +        } + +        """ +        return { +            self.id: { +                'url': self.url, +                'name': self.name, +                'plugin' : self.pluginname, +                'size': self.getSize(), +                'format_size': self.formatSize(), +                'status': self.status, +                'statusmsg': self.m.statusMsg[self.status], +                'package': self.packageid, +                'error': self.error +            } +        } +     +    def abortDownload(self): +        """abort pyfile if possible""" +        while self.id in self.m.core.threadManager.processingIds(): +            self.abort = True +            if self.plugin and self.plugin.req: self.plugin.req.abort = True +            sleep(0.1) +         +        abort = False  +        self.plugin.req.abort = False +         +    def finishIfDone(self): +        """set status to finish and release file if every thread is finished with it""" +         +        if self.id in self.m.core.threadManager.processingIds(): +            return False +         +        self.setStatus("finished") +        self.release() +        return True +     +    def formatWait(self): +        """ formats and return wait time in humanreadable format """ +        seconds = self.waitUntil - time() +         +        if seconds < 0 : return "00:00:00" +                 +        hours, seconds = divmod(seconds, 3600) +        minutes, seconds = divmod(seconds, 60) +        return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) +     +    def formatSize(self): +        """ formats size to readable format """ +        return formatSize(self.getSize()) + +    def formatETA(self): +        """ formats eta to readable format """ +        seconds = self.getETA() +         +        if seconds < 0 : return "00:00:00" +                 +        hours, seconds = divmod(seconds, 3600) +        minutes, seconds = divmod(seconds, 60) +        return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) +     +    def getSpeed(self): +        """ calculates speed """ +        try: +            return self.plugin.req.get_speed() +        except: +            return 0 +         +    def getETA(self): +        """ gets established time of arrival""" +        try: +            return self.plugin.req.get_ETA() +        except: +            return 0 +     +    def getBytesLeft(self): +        """ gets bytes left """ +        try: +            return self.plugin.req.bytes_left() +        except: +            return 0 +     +    def getPercent(self): +        """ get % of download """ +        try: +            return int((float(self.plugin.req.dl_arrived)  / self.plugin.req.dl_size) * 100) +        except: +            return 0 +         +    def getSize(self): +        """ get size of download """ +        if self.size: return self.size +        else: +            try: +                return self.plugin.req.dl_size +            except: +                return 0  class PyPackage(): -	def __init__(self, manager, id, name, folder, site, password, queue): -		self.m = manager -		self.m.packageCache[int(id)] = self - -		self.id = int(id) -		self.name = name -		self.folder = folder -		self.site = site -		self.password = password -		self.queue = queue - -	def toDict(self): -		"""return data as dict - -		format: - -		{ -		    id: {'name': name ... 'links': {} } } -		} - -		""" -		return { -			self.id: { -				'name': self.name, -				'folder': self.folder, -				'site': self.site, -				'password': self.password, -				'queue': self.queue, -				'links': {} -			} -		} - -	def getChildren(self): -		"""get information about contained links""" -		raise NotImplementedError - -	def sync(self): -		"""sync with db""" -		self.m.updatePackage(self) - -	def release(self): -		"""sync and delete from cache""" -		self.sync() -		self.m.releasePackage(self.id) - -	def delete(self): -		self.m.deletePackage(self.id) +    def __init__(self, manager, id, name, folder, site, password, queue): +        self.m = manager +        self.m.packageCache[int(id)] = self + +        self.id = int(id) +        self.name = name +        self.folder = folder +        self.site = site +        self.password = password +        self.queue = queue + +    def toDict(self): +        """return data as dict + +        format: + +        { +            id: {'name': name ... 'links': {} } } +        } + +        """ +        return { +            self.id: { +                'name': self.name, +                'folder': self.folder, +                'site': self.site, +                'password': self.password, +                'queue': self.queue, +                'links': {} +            } +        } + +    def getChildren(self): +        """get information about contained links""" +        raise NotImplementedError + +    def sync(self): +        """sync with db""" +        self.m.updatePackage(self) + +    def release(self): +        """sync and delete from cache""" +        self.sync() +        self.m.releasePackage(self.id) + +    def delete(self): +        self.m.deletePackage(self.id)  if __name__ == "__main__": -	pypath = "." +    pypath = "." -	db = FileHandler(None) +    db = FileHandler(None) -	#p = PyFile(db, 5) -	#sleep(0.1) +    #p = PyFile(db, 5) +    #sleep(0.1) -	a = time() +    a = time() -	#print db.addPackage("package", "folder" , 1) -	 -	#print db.addPackage("package", "folder",  1) +    #print db.addPackage("package", "folder" , 1) +     +    #print db.addPackage("package", "folder",  1) -	#db.addLinks([x for x in range(0,200)], 5) +    #db.addLinks([x for x in range(0,200)], 5) -	db.save() +    db.save() -	b = time() -	print "adding 200 links, single sql execs, no commit", b-a +    b = time() +    print "adding 200 links, single sql execs, no commit", b-a -	res = db.getCompleteData(1) -	#print res -	r = [ len(x["links"]) for x in res.itervalues() ] -	print r -	c = time() -	print "get all links", c-b +    res = db.getCompleteData(1) +    #print res +    r = [ len(x["links"]) for x in res.itervalues() ] +    print r +    c = time() +    print "get all links", c-b -	#i = 2 -	#db.updateLink(i, "urlupdate%s" % i, "nameupdate%s" % i, i, i, i,i) +    #i = 2 +    #db.updateLink(i, "urlupdate%s" % i, "nameupdate%s" % i, i, i, i,i) -	d = time() -	print "update one link", d-c +    d = time() +    print "update one link", d-c -	#p.sync() -	#p.remove() +    #p.sync() +    #p.remove() -	e = time() -	print "sync and remove link", e-d +    e = time() +    print "sync and remove link", e-d -	db.save() +    db.save() -	db.deletePackage(1) -	#db.commit() +    db.deletePackage(1) +    #db.commit() -	f = time() -	print "commit, remove package/links, commit", f-e +    f = time() +    print "commit, remove package/links, commit", f-e -	#db.commit() -	sleep(0.5) +    #db.commit() +    sleep(0.5) -	g = time() -	print "empty commit", g-f -0.5 +    g = time() +    print "empty commit", g-f -0.5 -	job = db.getJob("") -	print job -	 -	h = time() -	#print db.getFileCount() -	 -	print "get job", h-g +    job = db.getJob("") +    print job +     +    h = time() +    #print db.getFileCount() +     +    print "get job", h-g -	print db.getFileCount() -	 -	i = time() -	 -	print "file count", i-h -	 -	 -	print db.getJob("") -	 -	j = time() -	 -	 -	print "get job 2", j-i -	 -	for i in db.cache.itervalues(): -		i.sync() -	 -	sleep(1) -	 -	
\ No newline at end of file +    print db.getFileCount() +     +    i = time() +     +    print "file count", i-h +     +     +    print db.getJob("") +     +    j = time() +     +     +    print "get job 2", j-i +     +    for i in db.cache.itervalues(): +        i.sync() +     +    sleep(1) +     +     diff --git a/module/PullEvents.py b/module/PullEvents.py index 7b23cf8b8..bbb3f3e6b 100644 --- a/module/PullEvents.py +++ b/module/PullEvents.py @@ -44,7 +44,7 @@ class PullManager():                  break          if not validUuid:              self.newClient(uuid) -            events = [ReloadAllEvent("queue").toList(), ReloadAllEvent("packages").toList(), ReloadAllEvent("collector").toList()] +            events = [ReloadAllEvent("queue").toList(), ReloadAllEvent("collector").toList()]          return events      def addEvent(self, event): @@ -71,7 +71,7 @@ class Client():  class UpdateEvent():      def __init__(self, itype, iid, destination):          assert itype == "pack" or itype == "file" -        assert destination == "queue" or destination == "collector" or destination == "packages" +        assert destination == "queue" or destination == "collector"          self.type = itype          self.id = iid          self.destination = destination @@ -82,7 +82,7 @@ class UpdateEvent():  class RemoveEvent():      def __init__(self, itype, iid, destination):          assert itype == "pack" or itype == "file" -        assert destination == "queue" or destination == "collector" or destination == "packages" +        assert destination == "queue" or destination == "collector"          self.type = itype          self.id = iid          self.destination = destination @@ -93,7 +93,7 @@ class RemoveEvent():  class InsertEvent():      def __init__(self, itype, iid, after, destination):          assert itype == "pack" or itype == "file" -        assert destination == "queue" or destination == "collector" or destination == "packages" +        assert destination == "queue" or destination == "collector"          self.type = itype          self.id = iid          self.after = after @@ -104,7 +104,7 @@ class InsertEvent():  class ReloadAllEvent():      def __init__(self, destination): -        assert destination == "queue" or destination == "collector" or destination == "packages" +        assert destination == "queue" or destination == "collector"          self.destination = destination      def toList(self): | 
