diff options
author | Michele Calgaro <michele.calgaro@yahoo.it> | 2020-12-06 19:28:06 +0900 |
---|---|---|
committer | Michele Calgaro <michele.calgaro@yahoo.it> | 2020-12-06 19:28:49 +0900 |
commit | 247750abcbf6760bbc52aa5d64fc375d6fbee8a3 (patch) | |
tree | 86e029a960ddd599edbeee8dddf70e87ee314e23 /mcop/dispatcher.cc | |
parent | 595ad58e25c5d0f0c512194f66708f99e5bc1527 (diff) | |
download | arts-247750abcbf6760bbc52aa5d64fc375d6fbee8a3.tar.gz arts-247750abcbf6760bbc52aa5d64fc375d6fbee8a3.zip |
Renaming of files in preparation for code style tools.
Signed-off-by: Michele Calgaro <michele.calgaro@yahoo.it>
(cherry picked from commit 00d4f92b717fbcbed6f9eee361975d6ee5380d59)
Diffstat (limited to 'mcop/dispatcher.cc')
-rw-r--r-- | mcop/dispatcher.cc | 1090 |
1 files changed, 0 insertions, 1090 deletions
diff --git a/mcop/dispatcher.cc b/mcop/dispatcher.cc deleted file mode 100644 index 8250a02..0000000 --- a/mcop/dispatcher.cc +++ /dev/null @@ -1,1090 +0,0 @@ - /* - - Copyright (C) 2000-2001 Stefan Westerfeld - stefan@space.twc.de - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Library General Public - License as published by the Free Software Foundation; either - version 2 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Library General Public License for more details. - - You should have received a copy of the GNU Library General Public License - along with this library; see the file COPYING.LIB. If not, write to - the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - Boston, MA 02110-1301, USA. - - */ - -#include <config.h> -#include "dispatcher.h" -#include "delayedreturn.h" -#include "startupmanager.h" -#include "unixconnection.h" -#include "tcpconnection.h" -#include "referenceclean.h" -#include "core.h" -#include "md5auth.h" -#include "mcoputils.h" -#include "loopback.h" -#include "debug.h" -#include "ifacerepo_impl.h" -#include "thread.h" - -#include <sys/stat.h> -#include <stdio.h> -#include <signal.h> -#include <cstring> -#include <cstdlib> -#include <errno.h> -#include <iostream> - -#if TIME_WITH_SYS_TIME -# include <sys/time.h> -# include <time.h> -#elif HAVE_SYS_TIME_H -# include <sys/time.h> -#else -# include <time.h> -#endif - -/* Dispatcher private data class (to ensure binary compatibility) */ - -using namespace std; -using namespace Arts; - -namespace Arts { - -class DispatcherWakeUpHandler; - -class DispatcherPrivate { -public: - GlobalComm globalComm; - InterfaceRepo interfaceRepo; - AuthAccept *accept; - LoopbackConnection *loopbackConnection; - DelayedReturn *delayedReturn; - bool allowNoAuthentication; - Mutex mutex; - - /* - * Thread condition that gets signalled whenever something relevant for - * waitForResult happens. Note that broken connections are also relevant - * for waitForResult. - */ - ThreadCondition requestResultCondition; - - /* - * Thread condition that gets signalled whenever something relevant for - * the server connection process happens. This is either: - * - authentication fails - * - authentication succeeds - * - a connection breaks - */ - ThreadCondition serverConnectCondition; - - DispatcherWakeUpHandler *wakeUpHandler; -}; - -/** - * Class that performs dispatcher wakeup. - * - * The sending thread (requesting wakeup) writes a byte to a pipe. The - * main thread watches the pipe, and as soon as the byte arrives, gets - * woken by the IOManager. This should work, no matter what type of IOManager - * is used (i.e. StdIOManager/GIOManager/QIOManager). - */ -class DispatcherWakeUpHandler : public IONotify { -private: - enum { wReceive = 0, wSend = 1 }; - int wakeUpPipe[2]; - -public: - DispatcherWakeUpHandler() - { - if(pipe(wakeUpPipe) != 0) - arts_fatal("can't initialize wakeUp pipe (%s)",strerror(errno)); - - Dispatcher::the()->ioManager()->watchFD(wakeUpPipe[wReceive], - IOType::read | IOType::reentrant, this); - } - virtual ~DispatcherWakeUpHandler() - { - Dispatcher::the()->ioManager()->remove(this, IOType::all); - - close(wakeUpPipe[wSend]); - close(wakeUpPipe[wReceive]); - } - void notifyIO(int fd, int type) - { - arts_return_if_fail(fd == wakeUpPipe[wReceive]); - arts_return_if_fail(type == IOType::read); - - mcopbyte one; - int result; - do - result = read(wakeUpPipe[wReceive],&one,1); - while(result < 0 && errno == EINTR); - } - void wakeUp() - { - mcopbyte one = 1; - - int result; - do - result = write(wakeUpPipe[wSend],&one,1); - while(result < 0 && errno == EINTR); - } -}; - -} - -Dispatcher *Dispatcher::_instance = 0; - -Dispatcher::Dispatcher(IOManager *ioManager, StartServer startServer) -{ - assert(!_instance); - _instance = this; - - /* private data pointer */ - d = new DispatcherPrivate(); - - lock(); - - /* makes arts_debug/arts_message/arts_return_if_fail/... threadsafe */ - Debug::initMutex(); - - generateServerID(); - - if(ioManager) - { - _ioManager = ioManager; - deleteIOManagerOnExit = false; - } - else - { - _ioManager = new StdIOManager; - deleteIOManagerOnExit = true; - } - - d->wakeUpHandler = new DispatcherWakeUpHandler; - - objectManager = new ObjectManager; - - notificationManager = new NotificationManager; - - if(startServer & startUnixServer) - { - unixServer = new UnixServer(this,serverID); - if(!unixServer->running()) - { - delete unixServer; - arts_warning("[mcop dispatcher] Couldn't start UnixServer"); - unixServer = 0; - } - } - else unixServer = 0; - - if(startServer & startTCPServer) - { - tcpServer = new TCPServer(this); - if(!tcpServer->running()) - { - delete tcpServer; - arts_warning("[mcop dispatcher] Couldn't start TCPServer"); - tcpServer = 0; - } - } - else tcpServer = 0; - - d->allowNoAuthentication = startServer & noAuthentication; - d->accept = 0; - d->loopbackConnection = new LoopbackConnection(serverID); - d->interfaceRepo = InterfaceRepo::_from_base(new InterfaceRepo_impl()); - d->delayedReturn = 0; - - _flowSystem = 0; - referenceClean = new ReferenceClean(objectPool); - - /* - * setup signal handler for SIGPIPE - */ - orig_sigpipe = signal(SIGPIPE,SIG_IGN); - if((orig_sigpipe != SIG_DFL) && (orig_sigpipe != SIG_IGN)) - { - cerr << "[mcop dispatcher] warning: user defined signal handler found for" - " SIG_PIPE, overriding" << endl; - } - - StartupManager::startup(); - - /* - * this is required for publishing global references - might be a good - * reason for startup priorities as since this is required for cookie&co, - * no communication is possible without that - */ - - - char *env = getenv("ARTS_SERVER"); - bool envOk = false; - if(env) - { - string url = "tcp:"; url += env; - Connection *conn = connectUrl(url); - arts_debug("connection to %s for globalComm", url.c_str()); - if(conn) - { - arts_debug("hint %s", conn->findHint("GlobalComm").c_str()); - d->globalComm = Reference(conn->findHint("GlobalComm")); - envOk = true; - arts_debug("using globalcomm from env variable"); - } - } - - if(!envOk) - { - string globalCommName - = MCOPUtils::readConfigEntry("GlobalComm","Arts::TmpGlobalComm"); - d->globalComm = GlobalComm(SubClass(globalCommName)); - } - - // --- initialize MD5auth --- - /* - * Path for random seed: better to store it in home, because some - * installations wipe /tmp on reboot. - */ - string seedpath = MCOPUtils::createFilePath("random-seed"); - string mcopdir = MCOPUtils::mcopDirectory(); - if(!mcopdir.empty()) seedpath = mcopdir + "/random-seed"; - arts_md5_auth_init_seed(seedpath.c_str()); - - /* - * first generate a new random cookie and try to set secret-cookie to it - * as put will not overwrite, this has no effect if there is already a - * secret cookie - */ - char *cookie = arts_md5_auth_mkcookie(); - globalComm().put("secret-cookie",cookie); - - /* - * Then get the secret cookie from globalComm. As we've just set one, - * and as it is never removed, this always works. - */ - string secretCookie = globalComm().get("secret-cookie"); - if(!arts_md5_auth_set_cookie(secretCookie.c_str())) - { - /* - * Handle the case where the cookie obtained from GlobalComm is not - * a valid cookie (i.e. too short) - this should practically never - * happen. In this case, we will remove the cookie and overwrite it - * with our previously generated cookie. - */ - arts_warning("[mcop dispatcher] Bad md5 secret-cookie obtained from %s - replacing it", - globalComm()._interfaceName().c_str()); - - globalComm().erase("secret-cookie"); - globalComm().put("secret-cookie",cookie); - - if(!arts_md5_auth_set_cookie(cookie)) - arts_fatal("error initializing md5 secret cookie " - "(generated cookie invalid)"); - } - memset(cookie,0,strlen(cookie)); // try to keep memory clean - free(cookie); - - string::iterator i; // try to keep memory clean from secret cookie - for(i=secretCookie.begin();i != secretCookie.end();i++) *i = 'y'; - - unlock(); -} - -Dispatcher::~Dispatcher() -{ - lock(); - - /* no interaction possible now anymore - remove our global references */ - if(objectManager) - objectManager->removeGlobalReferences(); - - /* remove everything that might have been tagged for remote copying */ - referenceClean->forceClean(); - delete referenceClean; - - d->globalComm = GlobalComm::null(); - - /* shutdown all extensions we loaded */ - if(objectManager) - objectManager->shutdownExtensions(); - - StartupManager::shutdown(); - - /* drop all open connections */ - list<Connection *>::iterator ci; - for(ci=connections.begin(); ci != connections.end();ci++) - { - Connection *conn = *ci; - conn->drop(); - } - d->requestResultCondition.wakeAll(); - d->serverConnectCondition.wakeAll(); - - /* - * remove signal handler for SIGPIPE - */ - signal(SIGPIPE,orig_sigpipe); - - - d->interfaceRepo = InterfaceRepo::null(); - - if(d->accept) - { - delete d->accept; - d->accept = 0; - } - - if(d->loopbackConnection) - { - d->loopbackConnection->_release(); - d->loopbackConnection = 0; - } - if(unixServer) - { - delete unixServer; - unixServer = 0; - } - - if(tcpServer) - { - delete tcpServer; - tcpServer = 0; - } - - if(notificationManager) - { - delete notificationManager; - notificationManager = 0; - } - - if(objectManager && Object_base::_objectCount() == 0) - { - objectManager->removeExtensions(); - delete objectManager; - objectManager = 0; - } - - if(d->wakeUpHandler) - { - delete d->wakeUpHandler; - d->wakeUpHandler = 0; - } - - if(deleteIOManagerOnExit) - { - delete _ioManager; - _ioManager = 0; - } - - if(Object_base::_objectCount()) - { - cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still " - << Object_base::_objectCount() << " object references alive." << endl; - list<Object_skel *> which = objectPool.enumerate(); - list<Object_skel *>::iterator i; - for(i = which.begin(); i != which.end();i++) - cerr << " - " << (*i)->_interfaceName() << endl; - } - - if(Type::_typeCount()) - { - cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still " - << Type::_typeCount() << " types alive." << endl; - } - - if(GenericDataPacket::_dataPacketCount()) - { - cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still " - << GenericDataPacket::_dataPacketCount() - << " data packets alive." << endl; - } - - Debug::freeMutex(); - - unlock(); - - /* private data pointer */ - assert(d); - delete d; - d = 0; - - assert(_instance); - _instance = 0; -} - -InterfaceRepo Dispatcher::interfaceRepo() -{ - return d->interfaceRepo; -} - -FlowSystem_impl *Dispatcher::flowSystem() -{ - assert(_flowSystem); - return _flowSystem; -} - -GlobalComm Dispatcher::globalComm() -{ - assert(!d->globalComm.isNull()); - return d->globalComm; -} - -void Dispatcher::setFlowSystem(FlowSystem_impl *fs) -{ - assert(!_flowSystem); - _flowSystem = fs; -} - -Dispatcher *Dispatcher::the() -{ - return _instance; -} - -Buffer *Dispatcher::waitForResult(long requestID, Connection *connection) -{ - bool isMainThread = SystemThreads::the()->isMainThread(); - Buffer *b = requestResultPool[requestID]; - - connection->_copy(); // Keep extra ref - - while(!b && !connection->broken()) { - if(isMainThread) - _ioManager->processOneEvent(true); - else - d->requestResultCondition.wait(d->mutex); - - b = requestResultPool[requestID]; - } - - requestResultPool.releaseSlot(requestID); - - if(connection->broken()) // connection went away before we got some result - b = 0; - - connection->_release(); // Give up extra ref - - return b; -} - -Buffer *Dispatcher::createRequest(long& requestID, long objectID, long methodID) -{ - Buffer *buffer = new Buffer; - - // write mcop header record - buffer->writeLong(MCOP_MAGIC); - buffer->writeLong(0); // message length - to be patched later - buffer->writeLong(mcopInvocation); - - // generate a request ID - requestID = requestResultPool.allocSlot(); - - // write invocation record - buffer->writeLong(objectID); - buffer->writeLong(methodID); - buffer->writeLong(requestID); - - return buffer; -} - -Buffer *Dispatcher::createOnewayRequest(long objectID, long methodID) -{ - Buffer *buffer = new Buffer; - - // write mcop header record - buffer->writeLong(MCOP_MAGIC); - buffer->writeLong(0); // message length - to be patched later - buffer->writeLong(mcopOnewayInvocation); - - // write oneway invocation record - buffer->writeLong(objectID); - buffer->writeLong(methodID); - - return buffer; -} - -void Dispatcher::handle(Connection *conn, Buffer *buffer, long messageType) -{ - _activeConnection = conn; - -#ifdef DEBUG_IO - printf("got a message %ld, %ld bytes in body\n", - messageType,buffer->remaining()); - if(conn->connState() == Connection::unknown) - cout << "connectionState = unknown" << endl; - if(conn->connState() == Connection::expectClientHello) - cout << "connectionState = expectClientHello" << endl; - if(conn->connState() == Connection::expectServerHello) - cout << "connectionState = expectServerHello" << endl; - if(conn->connState() == Connection::expectAuthAccept) - cout << "connectionState = expectAuthAccept" << endl; - if(conn->connState() == Connection::established) - cout << "connectionState = established" << endl; -#endif - switch(conn->connState()) - { - case Connection::established: - /* - * we're connected to a trusted server, so we can accept - * invocations - */ - if(messageType == mcopInvocation) { -#ifdef DEBUG_MESSAGES - printf("[got Invocation]\n"); -#endif - long objectID = buffer->readLong(); - long methodID = buffer->readLong(); - long requestID = buffer->readLong(); - - Buffer *result = new Buffer; - // write mcop header record - result->writeLong(MCOP_MAGIC); - result->writeLong(0); // message length - to be patched later - result->writeLong(mcopReturn); - - // write result record (returnCode is written by dispatch) - result->writeLong(requestID); - - // perform the request - Object_skel *object = objectPool[objectID]; - object->_copy(); - object->_dispatch(buffer,result,methodID); - object->_release(); - - assert(!buffer->readError() && !buffer->remaining()); - delete buffer; - - if(d->delayedReturn) - { - delete result; - - result = new Buffer; - result->writeLong(MCOP_MAGIC); - result->writeLong(0); // to be patched later - result->writeLong(mcopReturn); - result->writeLong(requestID); - - d->delayedReturn->initialize(conn,result); - d->delayedReturn = 0; - } - else /* return normally */ - { - result->patchLength(); - conn->qSendBuffer(result); - } - return; /* everything ok - leave here */ - } - - if(messageType == mcopReturn) - { -#ifdef DEBUG_MESSAGES - printf("[got Return]\n"); -#endif - long requestID = buffer->readLong(); - requestResultPool[requestID] = buffer; - d->requestResultCondition.wakeAll(); - - return; /* everything ok - leave here */ - } - - if(messageType == mcopOnewayInvocation) { -#ifdef DEBUG_MESSAGES - printf("[got OnewayInvocation]\n"); -#endif - long objectID = buffer->readLong(); - long methodID = buffer->readLong(); - - // perform the request - Object_skel *object = objectPool[objectID]; - object->_copy(); - object->_dispatch(buffer,methodID); - object->_release(); - - assert(!buffer->readError() && !buffer->remaining()); - delete buffer; - - return; /* everything ok - leave here */ - } - break; - - case Connection::expectServerHello: - if(messageType == mcopServerHello) - { -#ifdef DEBUG_MESSAGES - printf("[got ServerHello]\n"); -#endif - /* - * if we get a server hello, answer with a client hello - */ - ServerHello h; - h.readType(*buffer); - bool valid = (!buffer->readError() && !buffer->remaining()); - delete buffer; - - if(!valid) break; // invalid hello received -> forget it - - conn->setServerID(h.serverID); - - /* - * check if md5auth or noauth is offered by the server - */ - bool md5authSupported = false; - bool noauthSupported = false; - vector<string>::iterator ai; - for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++) - { - if(*ai == "md5auth") md5authSupported = true; - if(*ai == "noauth") noauthSupported = true; - } - - if(noauthSupported) // noauth is usually easier to pass ;) - { - Buffer *helloBuffer = new Buffer; - - Header header(MCOP_MAGIC,0,mcopClientHello); - header.writeType(*helloBuffer); - ClientHello clientHello(serverID,"noauth",""); - clientHello.writeType(*helloBuffer); - - helloBuffer->patchLength(); - - conn->qSendBuffer(helloBuffer); - conn->setConnState(Connection::expectAuthAccept); - return; /* everything ok - leave here */ - } - else if(md5authSupported) - { - Buffer *helloBuffer = new Buffer; - - Header header(MCOP_MAGIC,0,mcopClientHello); - header.writeType(*helloBuffer); - ClientHello clientHello(serverID,"md5auth",""); - - const char *random_cookie = h.authSeed.c_str(); - if(strlen(random_cookie) == 32) - { - char *response = arts_md5_auth_mangle(random_cookie); - clientHello.authData = response; -#ifdef DEBUG_AUTH - printf(" got random_cookie = %s\n",random_cookie); - printf("reply with authData = %s\n",response); -#endif - free(response); - } - clientHello.writeType(*helloBuffer); - - helloBuffer->patchLength(); - - conn->qSendBuffer(helloBuffer); - conn->setConnState(Connection::expectAuthAccept); - return; /* everything ok - leave here */ - } - else - { - cerr << "[mcop dispatcher] error: don't know authentication protocol" << endl; - cerr << " server offered: "; - for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++) - cerr << *ai << " "; - cerr << endl; - } - } - break; - - case Connection::expectClientHello: - if(messageType == mcopClientHello) - { -#ifdef DEBUG_MESSAGES - printf("[got ClientHello]\n"); -#endif - ClientHello c; - c.readType(*buffer); - bool valid = (!buffer->readError() && !buffer->remaining()); - delete buffer; - - if(valid && ( - (c.authProtocol == "md5auth" && c.authData == conn->cookie()) - || (c.authProtocol == "noauth" && d->allowNoAuthentication) )) - { - conn->setServerID(c.serverID); - - /* build hints only for the first connection */ - if(!d->accept) - { - d->accept = new AuthAccept(); - - d->accept->hints.push_back( - "GlobalComm="+d->globalComm.toString()); - d->accept->hints.push_back( - "InterfaceRepo="+d->interfaceRepo.toString()); - } - - Buffer *helloBuffer = new Buffer; - Header header(MCOP_MAGIC,0,mcopAuthAccept); - header.writeType(*helloBuffer); - d->accept->writeType(*helloBuffer); - - helloBuffer->patchLength(); - conn->qSendBuffer(helloBuffer); - conn->setConnState(Connection::established); - - return; /* everything ok - leave here */ - } - } - break; - - case Connection::expectAuthAccept: - if(messageType == mcopAuthAccept) - { -#ifdef DEBUG_MESSAGES - printf("[got AuthAccept]\n"); -#endif - AuthAccept a; - a.readType(*buffer); - delete buffer; -#ifdef DEBUG_MESSAGES - - vector<string>::iterator hi; - for(hi = a.hints.begin(); hi != a.hints.end(); hi++) - cout << "[got ConnectionHint] " << *hi << endl; - -#endif - - conn->setConnState(Connection::established); - conn->setHints(a.hints); - d->serverConnectCondition.wakeAll(); - return; /* everything ok - leave here */ - } - break; - - case Connection::unknown: - assert(false); - break; - } - - /* - * We shouldn't reach this point if everything went all right - */ - cerr << "[mcop dispatcher] Fatal communication error with a client" << endl; - if(conn->connState() != Connection::established) - { - cerr << "[mcop dispatcher] Authentication of this client was not successful" << endl; - cerr << "[mcop dispatcher] Connection dropped" << endl; - conn->drop(); - } -} - -long Dispatcher::addObject(Object_skel *object) -{ - long objectID = objectPool.allocSlot(); - - objectPool[objectID] = object; - return objectID; -} - -void Dispatcher::removeObject(long objectID) -{ - assert(objectPool[objectID]); - objectPool.releaseSlot(objectID); -} - -void Dispatcher::generateServerID() -{ - char *buffer; - buffer = arts_strdup_printf("%s-%04x-%08lx", - MCOPUtils::getFullHostname().c_str(), - getpid(),time(0)); - serverID = buffer; - free(buffer); -} - -string Dispatcher::objectToString(long objectID) -{ - Buffer b; - ObjectReference oref; - - oref.serverID = serverID; - oref.objectID = objectID; - - // prefer a unix domainsocket connection over a plain tcp connection - if(unixServer) oref.urls.push_back(unixServer->url()); - if(tcpServer) oref.urls.push_back(tcpServer->url()); - - oref.writeType(b); - - return b.toString("MCOP-Object"); -} - -bool Dispatcher::stringToObjectReference(ObjectReference& r, const string& s) -{ - if(strncmp(s.c_str(),"global:",7) == 0) - { - // if the object reference starts with "global:", it refers to - // a global object which can be found with the objectManager - - string lookup = objectManager->getGlobalReference(&s.c_str()[7]); - return stringToObjectReference(r,lookup); - } - - - Buffer b; - if(!b.fromString(s,"MCOP-Object")) return false; - - r.readType(b); - if(b.readError() || b.remaining()) return false; - - return true; -} - -void *Dispatcher::connectObjectLocal(ObjectReference& reference, - const string& interface) -{ - if(reference.serverID == serverID) - { - void *result = objectPool[reference.objectID]->_cast(interface); - - if(result) - { - objectPool[reference.objectID]->_copy(); - return result; - } - } - - return 0; -} - -Connection *Dispatcher::connectObjectRemote(ObjectReference& reference) -{ - if(reference.serverID == "null") // null reference? - return 0; - - if(reference.serverID == serverID) - return loopbackConnection(); - - list<Connection *>::iterator i; - - for(i=connections.begin(); i != connections.end();i++) - { - Connection *conn = *i; - - if(conn->isConnected(reference.serverID)) - { - // fixme: we should check for the existence of the object - // and increment a reference count or something like that - return conn; - } - } - - /* try to connect the server */ - vector<string>::iterator ui; - for(ui = reference.urls.begin(); ui != reference.urls.end(); ui++) - { - Connection *conn = connectUrl(*ui); - if(conn) - { - if(conn->isConnected(reference.serverID)) - { - return conn; - } - else - { - /* we connected somewhere, but not the right server ;) */ - connections.remove(conn); - conn->_release(); - } - } - } - return 0; -} - -Connection *Dispatcher::connectUrl(const string& url) -{ - Connection *conn = 0; - bool isMainThread = SystemThreads::the()->isMainThread(); - - if(strncmp(url.c_str(),"tcp:",4) == 0) - { - conn = new TCPConnection(url); - } - else if(strncmp(url.c_str(),"unix:",5) == 0) - { - conn = new UnixConnection(url); - } - - if(conn) - { - conn->_copy(); // Keep extra ref for when the connection breaks - conn->setConnState(Connection::expectServerHello); - - while((conn->connState() != Connection::established) - && !conn->broken()) - { - if(isMainThread) - _ioManager->processOneEvent(true); - else - d->serverConnectCondition.wait(d->mutex); - } - - if(conn->connState() == Connection::established) - { - connections.push_back(conn); - conn->_release(); // Give up extra ref - return conn; - } - - // well - bad luck (building a connection failed) - - // Give up extra ref - conn->_release(); - } - return 0; -} - -void Dispatcher::run() -{ - assert(SystemThreads::the()->isMainThread()); - - _ioManager->run(); -} - -void Dispatcher::terminate() -{ - _ioManager->terminate(); -} - -void Dispatcher::initiateConnection(Connection *connection) -{ - vector<string> authProtocols; - authProtocols.push_back("md5auth"); - - if(d->allowNoAuthentication) - authProtocols.push_back("noauth"); - - char *authSeed = arts_md5_auth_mkcookie(); - char *authResult = arts_md5_auth_mangle(authSeed); - - Buffer *helloBuffer = new Buffer; - - Header header(MCOP_MAGIC,0,mcopServerHello); - header.writeType(*helloBuffer); - ServerHello serverHello("aRts/MCOP-1.0.0",serverID,authProtocols,authSeed); - serverHello.writeType(*helloBuffer); - - helloBuffer->patchLength(); - - connection->qSendBuffer(helloBuffer); - connection->setConnState(Connection::expectClientHello); - - connection->setCookie(authResult); - free(authSeed); - free(authResult); - - connections.push_back(connection); -} - -void Dispatcher::handleCorrupt(Connection *connection) -{ - if(connection->connState() != Connection::established) - { - cerr << "[mcop dispatcher] Received corrupt message on unauthenticated connection" <<endl; - cerr << "closing connection." << endl; - connection->drop(); - d->serverConnectCondition.wakeAll(); - } - else - { - cerr << "[mcop dispatcher] warning: got corrupt MCOP message !??" << endl; - } -} - -void Dispatcher::handleConnectionClose(Connection *connection) -{ - /* - * we can't use enumerate here, because the "existing objects list" might - * be changing due to the other _disconnectRemote calls we make, so we - * enumerate() the objects manually - */ - unsigned long l; - for(l=0; l<objectPool.max(); l++) - { - Object_skel *skel = objectPool[l]; - if(skel) skel->_disconnectRemote(connection); - } - - d->requestResultCondition.wakeAll(); - d->serverConnectCondition.wakeAll(); - - /* - * FIXME: - * - * there may be error handling to do (e.g., check that the _stub's that - * still refer to that connection don't crash now). - */ - connection->_release(); - - list<Connection *>::iterator i; - for(i=connections.begin(); i != connections.end();i++) - { - if(*i == connection) - { - connections.erase(i); - return; - } - } -} - -Connection *Dispatcher::activeConnection() -{ - return _activeConnection; -} - -Connection *Dispatcher::loopbackConnection() -{ - return d->loopbackConnection; -} - -DelayedReturn *Dispatcher::delayReturn() -{ - assert(!d->delayedReturn); - - return d->delayedReturn = new DelayedReturn(); -} - -Object_skel *Dispatcher::getLocalObject(long objectID) -{ - Object_skel *result = objectPool[objectID]; - - if(result) result->_copy(); - return result; -} - -void Dispatcher::lock() -{ - _instance->d->mutex.lock(); -} - -void Dispatcher::unlock() -{ - _instance->d->mutex.unlock(); -} - -void Dispatcher::wakeUp() -{ - if(SystemThreads::the()->isMainThread()) return; - - _instance->d->wakeUpHandler->wakeUp(); -} - -/* -void Dispatcher::reloadTraderData() is declared in trader_impl.cc -*/ |