#!/usr/bin/python ########################################################################### # kioslave - description # # ------------------------------ # # begin : Mon May 2 2005 # # copyright : (C) 2005 by AUTHOR # # email : your@email.com # # # ########################################################################### # # # This program is free software; you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation; either version 2 of the License, or # # (at your option) any later version. # # # ########################################################################### # Import the required Qt and KDE modules. from qt import * from kio import * from tdecore import * import os, time # For debugging purposes, import the sys and traceback modules. import sys, traceback DEBUG = 1 # Define a class which will be used to create IOSlave instances. ############################################################################ class SlaveClass(KIO.SlaveBase): """SlaveClass(KIO.SlaveBase) See kdelibs/kio/kio/slavebase.h for virtual functions to override. """ ######################################################################## def __init__(self, pool, app): # We must call the initialisation method of the base class. KIO.SlaveBase.__init__(self, "kioslave", pool, app) # Attach the DCOP client object associated with this IOSlave to the # DCOP server. self.dcopClient().attach() self.debug("dcopClient: %i" % self.dcopClient().isRegistered()) self.contents = RAMDir(None,u"/") self.host = "" self.document = None self.file = None self.debug("Exiting __init__ now"); ######################################################################## def __del__(self): pass ######################################################################## # KIO.SlaveBase method def setHost(self, host, port, user, passwd): self.debug( "setHost: %s %s %s %s" % ( repr(unicode(host)), repr(unicode(port)), repr(unicode(user)), repr(unicode(passwd)) ) ) # This IOSlave does not allow a host to be specified as part of # a URL. if unicode(host) != u"": self.closeConnection() self.error(KIO.ERR_MALFORMED_URL, host) return ######################################################################## # KIO.SlaveBase method def openConnection(self): # Don't call self.finished() in this method. self.debug("openConnection") ######################################################################## # KIO.SlaveBase method def closeConnection(self): # Don't call self.finished() in this method. self.debug("closeConnection") ######################################################################## # KIO.SlaveBase method def get(self, url): path = str(url.path()) self.debug("get(): %s" % path) self.openConnection() item = self.contents.resolve(path) if item is None: self.error(KIO.ERR_DOES_NOT_EXIST, path) return if item.isDir(): self.error(KIO.ERR_IS_DIRECTORY, path) self.totalSize(len(item.getData())) self.data(QByteArray(item.getData())) # The end of the data string. self.data(QByteArray()) self.finished() ######################################################################## # KIO.SlaveBase method def put(self, url, permissions, overwrite, resume): self.debug("put") self.openConnection() path = str(url.path()) parts = path.split('/') filename = parts[-1] parent_dir = self.contents.resolveParent(path) if parent_dir is None: parent_path = '/'.join(parts[:-1]) self.error(KIO.ERR_DOES_NOT_EXIST, parent_path) return if parent_dir.contains(filename): if not overwrite: self.error(KIO.ERR_COULD_NOT_WRITE, parent_path) return else: parent_dir.unlink(filename) # Read data from the application. bytearray = QByteArray() bytes = 0 data = "" while True: self.dataReq() result = self.readData(bytearray) if result <= 0: # An error or the end of data was encountered. break # The number of bytes read is given in the result. bytes = bytes + result data = data + str(bytearray) parent_dir.insert(RAMFile(parent_dir,filename,data)) self.finished() ######################################################################## # KIO.SlaveBase method def stat(self, url): self.debug("stat: %s" % url.url(0,0)) self.openConnection() self.debug("path:%s"% url.path()) # Return info the for the root. item = self.contents.resolve(str(url.path())) if item is None: self.error(KIO.ERR_DOES_NOT_EXIST, str(url.path())) return self.statEntry(item.getStatEntry()) self.finished() ######################################################################## # KIO.SlaveBase method def mimetype(self, url): self.debug("mimetype: %s" % unicode(url)) self.openConnection() path = str(url.path()) item = self.contents.resolve(path) if item is None: self.error(KIO.ERR_DOES_NOT_EXIST, path) return self.mimeType(item.getMimetype()) self.finished() ######################################################################## # KIO.SlaveBase method def listDir(self, url): # The "url" argument is a tdecore.KURL object. self.debug("listDir: %s" % str(url.prettyURL(0))) self.openConnection() path = str(url.path()) dir = self.contents.resolve(path) if dir is None: self.error(KIO.ERR_DOES_NOT_EXIST, path) return if not dir.isDir(): self.error(KIO.ERR_IS_FILE, path) return for entry in dir.listDir(): self.listEntry(entry, 0) self.listEntry([], 1) # Signal that the list is finished. self.finished() ######################################################################## # KIO.SlaveBase method def mkdir(self, url, permissions): self.debug("mkdir") self.openConnection() parent_path = str(url.path()) parent_dir = self.contents.resolveParent(parent_path) if parent_dir is None: self.error(KIO.ERR_DOES_NOT_EXIST, parent_path) return new_dir_obj = parent_dir.mkdir(parent_path.split('/')[-1]) if new_dir_obj is None: self.error(KIO.ERR_COULD_NOT_MKDIR, parent_path) return self.finished() ######################################################################## # KIO.SlaveBase method def rename(self, src, dest, overwrite): self.debug("rename: %s %s" % (src.path(), dest.path())) self.openConnection() src_path = str(src.path()) src_obj = self.contents.resolve(src_path) if src_obj is None: self.error(KIO.ERR_DOES_NOT_EXIST, src_path) return # See if the destination path already exists. dest_path = str(dest.path()) dest_obj = self.contents.resolve(dest_path) if dest_obj is not None: if dest_obj is src_obj: self.finished() # Done already. return if not overwrite: # Can't overwrite. not bad. self.error(KIO.ERR_CANNOT_RENAME, dest_path) return else: # Over write, just remove the object. dest_obj.getParent().unlink(dest_obj.getName()) dest_dir = self.contents.resolveParent(dest_path) if dest_dir is None: self.error(KIO.ERR_DOES_NOT_EXIST, dest_path) return src_obj.getParent().unlink(src_obj) src_obj.setName(dest_path.split('/')[-1]) dest_dir.insert(src_obj) self.finished() # Other possible file operations are represented by the following # methods which are not implemented. #def symlink(self, target, dest, overwrite): # debug("symlink") # ... # self.finished() #def chmod(self, url, permissions): # debug("chmod") # ... # self.finished() ######################################################################## # KIO.SlaveBase method def copy(self, src, dest, permissions, overwrite): self.debug("copy") self.openConnection() src_path = str(src.path()) src_obj = self.contents.resolve(src_path) if src_obj is None: self.error(KIO.ERR_DOES_NOT_EXIST, src_path) return # See if the destination path already exists. dest_path = str(dest.path()) dest_obj = self.contents.resolve(dest_path) if dest_obj is not None: if dest_obj is src_obj: self.finished() # Done already. return if not overwrite: # Can't overwrite. not bad. self.error(KIO.ERR_COULD_NOT_WRITE, dest_path) return else: # Over write, just remove the object. dest_obj.getParent().unlink(dest_obj.getName()) dest_dir = self.contents.resolveParent(dest_path) if dest_dir is None: self.error(KIO.ERR_DOES_NOT_EXIST, dest_path) return new_obj = src_obj.copy() new_obj.setName(dest_path.split('/')[-1]) dest_dir.insert(new_obj) self.finished() ######################################################################## # KIO.SlaveBase method def del_(self, url, isfile): self.debug("del_") self.openConnection() path = str(url.path()) item = self.contents.resolve(path) if item is None: self.error(KIO.ERR_DOES_NOT_EXIST, path) return item.getParent().unlink(item.getName()) self.finished() ######################################################################## # KIO.SlaveBase method def disconnectSlave(self): self.debug("disconnectSlave") return ######################################################################## # KIO.SlaveBase method def dispatchLoop(self): self.debug("dispatchLoop") KIO.SlaveBase.dispatchLoop(self) ######################################################################## # KIO.SlaveBase method def error(self,errid,text): self.debug("error: %i, %s" % (errid,text) ) KIO.SlaveBase.error(self,errid,text) ############################################################################ def debug(self,msg): if DEBUG == 0: return print "kioslave:"+str(msg)+"\n" sys.stdout.flush() ############################################################################ class RAMDir(object): ############################################################################ def __init__(self,parent,name): self.contents = {} self.parent = parent self.name = str(name) ############################################################################ def getParent(self): return self.parent ############################################################################ def setParent(self,parent): self.parent = parent ############################################################################ def getName(self): return self.name ############################################################################ def setName(self,name): self.name = str(name) ############################################################################ def resolve(self,path): while path.endswith('/'): path = path[:-1] while path.startswith('/'): path = path[1:] if path=='': return self parts = path.split('/') self.debug(path) for item in self.contents.keys(): self.debug("keys:"+item) if parts[0] in self.contents: return self.contents[parts[0]].resolve('/'.join(parts[1:])) self.debug("CHECKPOINT 1") return None ############################################################################ def resolveParent(self,path): while path.endswith('/'): path = path[:-1] while path.startswith('/'): path = path[1:] if path=="": return None parts = path.split('/') return self.resolve('/'.join(parts[:-1])) ############################################################################ def mkdir(self,name): if name in self.contents: return None new_dir = RAMDir(self,name) self.contents[name] = new_dir return new_dir ############################################################################ def getStatEntry(self): # Return info the for the root. length = 0 entry = [] atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_NAME atom.m_str = self.name #debug("name: %s" % name) entry.append(atom) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_SIZE atom.m_long = length #debug("length: %i" % length) entry.append(atom) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_MODIFICATION_TIME # Number of seconds since the epoch. atom.m_long = int(time.time()) entry.append(atom) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_ACCESS # The usual octal permission information (rw-r--r-- in this case). atom.m_long = 0644 entry.append(atom) # If the stat method is implemented then entries _must_ include # the UDE_FILE_TYPE atom or the whole system may not work at all. atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_FILE_TYPE #atom.m_long = os.path.stat.S_IFREG atom.m_long = os.path.stat.S_IFDIR entry.append(atom) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_MIME_TYPE atom.m_str = self.getMimetype() entry.append(atom) return entry ############################################################################ def listDir(self): list = [] for item in self.contents.values(): list.append(item.getStatEntry()) return list ############################################################################ def isDir(self): return True ############################################################################ def insert(self,item): self.contents[item.getName()] = item ############################################################################ def contains(self,name): return name in self.contents ############################################################################ def unlink(self,name): if str(name) in self.contents: del self.contents[str(name)] ############################################################################ def debug(self,msg): if DEBUG == 0: return print "kioslave:"+str(msg)+"\n" sys.stdout.flush() ############################################################################ def getMimetype(self): return "inode/directory" ############################################################################ def copy(self): new_dir = RAMDir(None,self.name) for item in self.contents.values(): new_item = item.copy() new_item.setParent(new_dir) new_dir.insert(new_item) return new_dir ############################################################################ class RAMFile(object): ############################################################################ def __init__(self,parent,name,data=None): self.parent = parent self.name = str(name) self.data = data ############################################################################ def getParent(self): return self.parent ############################################################################ def setParent(self,parent): self.parent = parent ############################################################################ def getName(self): return self.name ############################################################################ def setName(self,name): self.name = str(name) ############################################################################ def resolve(self,path): if path!="": return None return self ############################################################################ def getData(self): return self.data ############################################################################ def resolveParent(self,path): return None ############################################################################ def getStatEntry(self): # Return info the for the root. length = 0 entry = [] atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_NAME atom.m_str = self.name #debug("name: %s" % name) entry.append(atom) length = 0 if self.data is not None: length = len(self.data) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_SIZE atom.m_long = length #debug("length: %i" % length) entry.append(atom) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_MODIFICATION_TIME # Number of seconds since the epoch. atom.m_long = int(time.time()) entry.append(atom) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_ACCESS # The usual octal permission information (rw-r--r-- in this case). atom.m_long = 0644 entry.append(atom) # If the stat method is implemented then entries _must_ include # the UDE_FILE_TYPE atom or the whole system may not work at all. atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_FILE_TYPE atom.m_long = os.path.stat.S_IFREG entry.append(atom) atom = KIO.UDSAtom() atom.m_uds = KIO.UDS_MIME_TYPE atom.m_str = self.getMimetype() entry.append(atom) return entry ############################################################################ def isDir(self): return False ############################################################################ def getMimetype(self): return "text/html" ############################################################################ def copy(self): return RAMFile(None,self.name,self.data) ############################################################################ def SlaveFactory(pool, app): slave = SlaveClass(pool, app) slave.dispatchLoop()