summaryrefslogtreecommitdiffstats
path: root/mcop/dispatcher.cc
diff options
context:
space:
mode:
authorMichele Calgaro <michele.calgaro@yahoo.it>2020-12-06 19:28:06 +0900
committerMichele Calgaro <michele.calgaro@yahoo.it>2020-12-06 19:28:49 +0900
commit247750abcbf6760bbc52aa5d64fc375d6fbee8a3 (patch)
tree86e029a960ddd599edbeee8dddf70e87ee314e23 /mcop/dispatcher.cc
parent595ad58e25c5d0f0c512194f66708f99e5bc1527 (diff)
downloadarts-247750abcbf6760bbc52aa5d64fc375d6fbee8a3.tar.gz
arts-247750abcbf6760bbc52aa5d64fc375d6fbee8a3.zip
Renaming of files in preparation for code style tools.
Signed-off-by: Michele Calgaro <michele.calgaro@yahoo.it> (cherry picked from commit 00d4f92b717fbcbed6f9eee361975d6ee5380d59)
Diffstat (limited to 'mcop/dispatcher.cc')
-rw-r--r--mcop/dispatcher.cc1090
1 files changed, 0 insertions, 1090 deletions
diff --git a/mcop/dispatcher.cc b/mcop/dispatcher.cc
deleted file mode 100644
index 8250a02..0000000
--- a/mcop/dispatcher.cc
+++ /dev/null
@@ -1,1090 +0,0 @@
- /*
-
- Copyright (C) 2000-2001 Stefan Westerfeld
- stefan@space.twc.de
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Library General Public
- License as published by the Free Software Foundation; either
- version 2 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Library General Public License for more details.
-
- You should have received a copy of the GNU Library General Public License
- along with this library; see the file COPYING.LIB. If not, write to
- the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
-
- */
-
-#include <config.h>
-#include "dispatcher.h"
-#include "delayedreturn.h"
-#include "startupmanager.h"
-#include "unixconnection.h"
-#include "tcpconnection.h"
-#include "referenceclean.h"
-#include "core.h"
-#include "md5auth.h"
-#include "mcoputils.h"
-#include "loopback.h"
-#include "debug.h"
-#include "ifacerepo_impl.h"
-#include "thread.h"
-
-#include <sys/stat.h>
-#include <stdio.h>
-#include <signal.h>
-#include <cstring>
-#include <cstdlib>
-#include <errno.h>
-#include <iostream>
-
-#if TIME_WITH_SYS_TIME
-# include <sys/time.h>
-# include <time.h>
-#elif HAVE_SYS_TIME_H
-# include <sys/time.h>
-#else
-# include <time.h>
-#endif
-
-/* Dispatcher private data class (to ensure binary compatibility) */
-
-using namespace std;
-using namespace Arts;
-
-namespace Arts {
-
-class DispatcherWakeUpHandler;
-
-class DispatcherPrivate {
-public:
- GlobalComm globalComm;
- InterfaceRepo interfaceRepo;
- AuthAccept *accept;
- LoopbackConnection *loopbackConnection;
- DelayedReturn *delayedReturn;
- bool allowNoAuthentication;
- Mutex mutex;
-
- /*
- * Thread condition that gets signalled whenever something relevant for
- * waitForResult happens. Note that broken connections are also relevant
- * for waitForResult.
- */
- ThreadCondition requestResultCondition;
-
- /*
- * Thread condition that gets signalled whenever something relevant for
- * the server connection process happens. This is either:
- * - authentication fails
- * - authentication succeeds
- * - a connection breaks
- */
- ThreadCondition serverConnectCondition;
-
- DispatcherWakeUpHandler *wakeUpHandler;
-};
-
-/**
- * Class that performs dispatcher wakeup.
- *
- * The sending thread (requesting wakeup) writes a byte to a pipe. The
- * main thread watches the pipe, and as soon as the byte arrives, gets
- * woken by the IOManager. This should work, no matter what type of IOManager
- * is used (i.e. StdIOManager/GIOManager/QIOManager).
- */
-class DispatcherWakeUpHandler : public IONotify {
-private:
- enum { wReceive = 0, wSend = 1 };
- int wakeUpPipe[2];
-
-public:
- DispatcherWakeUpHandler()
- {
- if(pipe(wakeUpPipe) != 0)
- arts_fatal("can't initialize wakeUp pipe (%s)",strerror(errno));
-
- Dispatcher::the()->ioManager()->watchFD(wakeUpPipe[wReceive],
- IOType::read | IOType::reentrant, this);
- }
- virtual ~DispatcherWakeUpHandler()
- {
- Dispatcher::the()->ioManager()->remove(this, IOType::all);
-
- close(wakeUpPipe[wSend]);
- close(wakeUpPipe[wReceive]);
- }
- void notifyIO(int fd, int type)
- {
- arts_return_if_fail(fd == wakeUpPipe[wReceive]);
- arts_return_if_fail(type == IOType::read);
-
- mcopbyte one;
- int result;
- do
- result = read(wakeUpPipe[wReceive],&one,1);
- while(result < 0 && errno == EINTR);
- }
- void wakeUp()
- {
- mcopbyte one = 1;
-
- int result;
- do
- result = write(wakeUpPipe[wSend],&one,1);
- while(result < 0 && errno == EINTR);
- }
-};
-
-}
-
-Dispatcher *Dispatcher::_instance = 0;
-
-Dispatcher::Dispatcher(IOManager *ioManager, StartServer startServer)
-{
- assert(!_instance);
- _instance = this;
-
- /* private data pointer */
- d = new DispatcherPrivate();
-
- lock();
-
- /* makes arts_debug/arts_message/arts_return_if_fail/... threadsafe */
- Debug::initMutex();
-
- generateServerID();
-
- if(ioManager)
- {
- _ioManager = ioManager;
- deleteIOManagerOnExit = false;
- }
- else
- {
- _ioManager = new StdIOManager;
- deleteIOManagerOnExit = true;
- }
-
- d->wakeUpHandler = new DispatcherWakeUpHandler;
-
- objectManager = new ObjectManager;
-
- notificationManager = new NotificationManager;
-
- if(startServer & startUnixServer)
- {
- unixServer = new UnixServer(this,serverID);
- if(!unixServer->running())
- {
- delete unixServer;
- arts_warning("[mcop dispatcher] Couldn't start UnixServer");
- unixServer = 0;
- }
- }
- else unixServer = 0;
-
- if(startServer & startTCPServer)
- {
- tcpServer = new TCPServer(this);
- if(!tcpServer->running())
- {
- delete tcpServer;
- arts_warning("[mcop dispatcher] Couldn't start TCPServer");
- tcpServer = 0;
- }
- }
- else tcpServer = 0;
-
- d->allowNoAuthentication = startServer & noAuthentication;
- d->accept = 0;
- d->loopbackConnection = new LoopbackConnection(serverID);
- d->interfaceRepo = InterfaceRepo::_from_base(new InterfaceRepo_impl());
- d->delayedReturn = 0;
-
- _flowSystem = 0;
- referenceClean = new ReferenceClean(objectPool);
-
- /*
- * setup signal handler for SIGPIPE
- */
- orig_sigpipe = signal(SIGPIPE,SIG_IGN);
- if((orig_sigpipe != SIG_DFL) && (orig_sigpipe != SIG_IGN))
- {
- cerr << "[mcop dispatcher] warning: user defined signal handler found for"
- " SIG_PIPE, overriding" << endl;
- }
-
- StartupManager::startup();
-
- /*
- * this is required for publishing global references - might be a good
- * reason for startup priorities as since this is required for cookie&co,
- * no communication is possible without that
- */
-
-
- char *env = getenv("ARTS_SERVER");
- bool envOk = false;
- if(env)
- {
- string url = "tcp:"; url += env;
- Connection *conn = connectUrl(url);
- arts_debug("connection to %s for globalComm", url.c_str());
- if(conn)
- {
- arts_debug("hint %s", conn->findHint("GlobalComm").c_str());
- d->globalComm = Reference(conn->findHint("GlobalComm"));
- envOk = true;
- arts_debug("using globalcomm from env variable");
- }
- }
-
- if(!envOk)
- {
- string globalCommName
- = MCOPUtils::readConfigEntry("GlobalComm","Arts::TmpGlobalComm");
- d->globalComm = GlobalComm(SubClass(globalCommName));
- }
-
- // --- initialize MD5auth ---
- /*
- * Path for random seed: better to store it in home, because some
- * installations wipe /tmp on reboot.
- */
- string seedpath = MCOPUtils::createFilePath("random-seed");
- string mcopdir = MCOPUtils::mcopDirectory();
- if(!mcopdir.empty()) seedpath = mcopdir + "/random-seed";
- arts_md5_auth_init_seed(seedpath.c_str());
-
- /*
- * first generate a new random cookie and try to set secret-cookie to it
- * as put will not overwrite, this has no effect if there is already a
- * secret cookie
- */
- char *cookie = arts_md5_auth_mkcookie();
- globalComm().put("secret-cookie",cookie);
-
- /*
- * Then get the secret cookie from globalComm. As we've just set one,
- * and as it is never removed, this always works.
- */
- string secretCookie = globalComm().get("secret-cookie");
- if(!arts_md5_auth_set_cookie(secretCookie.c_str()))
- {
- /*
- * Handle the case where the cookie obtained from GlobalComm is not
- * a valid cookie (i.e. too short) - this should practically never
- * happen. In this case, we will remove the cookie and overwrite it
- * with our previously generated cookie.
- */
- arts_warning("[mcop dispatcher] Bad md5 secret-cookie obtained from %s - replacing it",
- globalComm()._interfaceName().c_str());
-
- globalComm().erase("secret-cookie");
- globalComm().put("secret-cookie",cookie);
-
- if(!arts_md5_auth_set_cookie(cookie))
- arts_fatal("error initializing md5 secret cookie "
- "(generated cookie invalid)");
- }
- memset(cookie,0,strlen(cookie)); // try to keep memory clean
- free(cookie);
-
- string::iterator i; // try to keep memory clean from secret cookie
- for(i=secretCookie.begin();i != secretCookie.end();i++) *i = 'y';
-
- unlock();
-}
-
-Dispatcher::~Dispatcher()
-{
- lock();
-
- /* no interaction possible now anymore - remove our global references */
- if(objectManager)
- objectManager->removeGlobalReferences();
-
- /* remove everything that might have been tagged for remote copying */
- referenceClean->forceClean();
- delete referenceClean;
-
- d->globalComm = GlobalComm::null();
-
- /* shutdown all extensions we loaded */
- if(objectManager)
- objectManager->shutdownExtensions();
-
- StartupManager::shutdown();
-
- /* drop all open connections */
- list<Connection *>::iterator ci;
- for(ci=connections.begin(); ci != connections.end();ci++)
- {
- Connection *conn = *ci;
- conn->drop();
- }
- d->requestResultCondition.wakeAll();
- d->serverConnectCondition.wakeAll();
-
- /*
- * remove signal handler for SIGPIPE
- */
- signal(SIGPIPE,orig_sigpipe);
-
-
- d->interfaceRepo = InterfaceRepo::null();
-
- if(d->accept)
- {
- delete d->accept;
- d->accept = 0;
- }
-
- if(d->loopbackConnection)
- {
- d->loopbackConnection->_release();
- d->loopbackConnection = 0;
- }
- if(unixServer)
- {
- delete unixServer;
- unixServer = 0;
- }
-
- if(tcpServer)
- {
- delete tcpServer;
- tcpServer = 0;
- }
-
- if(notificationManager)
- {
- delete notificationManager;
- notificationManager = 0;
- }
-
- if(objectManager && Object_base::_objectCount() == 0)
- {
- objectManager->removeExtensions();
- delete objectManager;
- objectManager = 0;
- }
-
- if(d->wakeUpHandler)
- {
- delete d->wakeUpHandler;
- d->wakeUpHandler = 0;
- }
-
- if(deleteIOManagerOnExit)
- {
- delete _ioManager;
- _ioManager = 0;
- }
-
- if(Object_base::_objectCount())
- {
- cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still "
- << Object_base::_objectCount() << " object references alive." << endl;
- list<Object_skel *> which = objectPool.enumerate();
- list<Object_skel *>::iterator i;
- for(i = which.begin(); i != which.end();i++)
- cerr << " - " << (*i)->_interfaceName() << endl;
- }
-
- if(Type::_typeCount())
- {
- cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still "
- << Type::_typeCount() << " types alive." << endl;
- }
-
- if(GenericDataPacket::_dataPacketCount())
- {
- cerr << "[mcop dispatcher] warning: leaving MCOP Dispatcher and still "
- << GenericDataPacket::_dataPacketCount()
- << " data packets alive." << endl;
- }
-
- Debug::freeMutex();
-
- unlock();
-
- /* private data pointer */
- assert(d);
- delete d;
- d = 0;
-
- assert(_instance);
- _instance = 0;
-}
-
-InterfaceRepo Dispatcher::interfaceRepo()
-{
- return d->interfaceRepo;
-}
-
-FlowSystem_impl *Dispatcher::flowSystem()
-{
- assert(_flowSystem);
- return _flowSystem;
-}
-
-GlobalComm Dispatcher::globalComm()
-{
- assert(!d->globalComm.isNull());
- return d->globalComm;
-}
-
-void Dispatcher::setFlowSystem(FlowSystem_impl *fs)
-{
- assert(!_flowSystem);
- _flowSystem = fs;
-}
-
-Dispatcher *Dispatcher::the()
-{
- return _instance;
-}
-
-Buffer *Dispatcher::waitForResult(long requestID, Connection *connection)
-{
- bool isMainThread = SystemThreads::the()->isMainThread();
- Buffer *b = requestResultPool[requestID];
-
- connection->_copy(); // Keep extra ref
-
- while(!b && !connection->broken()) {
- if(isMainThread)
- _ioManager->processOneEvent(true);
- else
- d->requestResultCondition.wait(d->mutex);
-
- b = requestResultPool[requestID];
- }
-
- requestResultPool.releaseSlot(requestID);
-
- if(connection->broken()) // connection went away before we got some result
- b = 0;
-
- connection->_release(); // Give up extra ref
-
- return b;
-}
-
-Buffer *Dispatcher::createRequest(long& requestID, long objectID, long methodID)
-{
- Buffer *buffer = new Buffer;
-
- // write mcop header record
- buffer->writeLong(MCOP_MAGIC);
- buffer->writeLong(0); // message length - to be patched later
- buffer->writeLong(mcopInvocation);
-
- // generate a request ID
- requestID = requestResultPool.allocSlot();
-
- // write invocation record
- buffer->writeLong(objectID);
- buffer->writeLong(methodID);
- buffer->writeLong(requestID);
-
- return buffer;
-}
-
-Buffer *Dispatcher::createOnewayRequest(long objectID, long methodID)
-{
- Buffer *buffer = new Buffer;
-
- // write mcop header record
- buffer->writeLong(MCOP_MAGIC);
- buffer->writeLong(0); // message length - to be patched later
- buffer->writeLong(mcopOnewayInvocation);
-
- // write oneway invocation record
- buffer->writeLong(objectID);
- buffer->writeLong(methodID);
-
- return buffer;
-}
-
-void Dispatcher::handle(Connection *conn, Buffer *buffer, long messageType)
-{
- _activeConnection = conn;
-
-#ifdef DEBUG_IO
- printf("got a message %ld, %ld bytes in body\n",
- messageType,buffer->remaining());
- if(conn->connState() == Connection::unknown)
- cout << "connectionState = unknown" << endl;
- if(conn->connState() == Connection::expectClientHello)
- cout << "connectionState = expectClientHello" << endl;
- if(conn->connState() == Connection::expectServerHello)
- cout << "connectionState = expectServerHello" << endl;
- if(conn->connState() == Connection::expectAuthAccept)
- cout << "connectionState = expectAuthAccept" << endl;
- if(conn->connState() == Connection::established)
- cout << "connectionState = established" << endl;
-#endif
- switch(conn->connState())
- {
- case Connection::established:
- /*
- * we're connected to a trusted server, so we can accept
- * invocations
- */
- if(messageType == mcopInvocation) {
-#ifdef DEBUG_MESSAGES
- printf("[got Invocation]\n");
-#endif
- long objectID = buffer->readLong();
- long methodID = buffer->readLong();
- long requestID = buffer->readLong();
-
- Buffer *result = new Buffer;
- // write mcop header record
- result->writeLong(MCOP_MAGIC);
- result->writeLong(0); // message length - to be patched later
- result->writeLong(mcopReturn);
-
- // write result record (returnCode is written by dispatch)
- result->writeLong(requestID);
-
- // perform the request
- Object_skel *object = objectPool[objectID];
- object->_copy();
- object->_dispatch(buffer,result,methodID);
- object->_release();
-
- assert(!buffer->readError() && !buffer->remaining());
- delete buffer;
-
- if(d->delayedReturn)
- {
- delete result;
-
- result = new Buffer;
- result->writeLong(MCOP_MAGIC);
- result->writeLong(0); // to be patched later
- result->writeLong(mcopReturn);
- result->writeLong(requestID);
-
- d->delayedReturn->initialize(conn,result);
- d->delayedReturn = 0;
- }
- else /* return normally */
- {
- result->patchLength();
- conn->qSendBuffer(result);
- }
- return; /* everything ok - leave here */
- }
-
- if(messageType == mcopReturn)
- {
-#ifdef DEBUG_MESSAGES
- printf("[got Return]\n");
-#endif
- long requestID = buffer->readLong();
- requestResultPool[requestID] = buffer;
- d->requestResultCondition.wakeAll();
-
- return; /* everything ok - leave here */
- }
-
- if(messageType == mcopOnewayInvocation) {
-#ifdef DEBUG_MESSAGES
- printf("[got OnewayInvocation]\n");
-#endif
- long objectID = buffer->readLong();
- long methodID = buffer->readLong();
-
- // perform the request
- Object_skel *object = objectPool[objectID];
- object->_copy();
- object->_dispatch(buffer,methodID);
- object->_release();
-
- assert(!buffer->readError() && !buffer->remaining());
- delete buffer;
-
- return; /* everything ok - leave here */
- }
- break;
-
- case Connection::expectServerHello:
- if(messageType == mcopServerHello)
- {
-#ifdef DEBUG_MESSAGES
- printf("[got ServerHello]\n");
-#endif
- /*
- * if we get a server hello, answer with a client hello
- */
- ServerHello h;
- h.readType(*buffer);
- bool valid = (!buffer->readError() && !buffer->remaining());
- delete buffer;
-
- if(!valid) break; // invalid hello received -> forget it
-
- conn->setServerID(h.serverID);
-
- /*
- * check if md5auth or noauth is offered by the server
- */
- bool md5authSupported = false;
- bool noauthSupported = false;
- vector<string>::iterator ai;
- for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++)
- {
- if(*ai == "md5auth") md5authSupported = true;
- if(*ai == "noauth") noauthSupported = true;
- }
-
- if(noauthSupported) // noauth is usually easier to pass ;)
- {
- Buffer *helloBuffer = new Buffer;
-
- Header header(MCOP_MAGIC,0,mcopClientHello);
- header.writeType(*helloBuffer);
- ClientHello clientHello(serverID,"noauth","");
- clientHello.writeType(*helloBuffer);
-
- helloBuffer->patchLength();
-
- conn->qSendBuffer(helloBuffer);
- conn->setConnState(Connection::expectAuthAccept);
- return; /* everything ok - leave here */
- }
- else if(md5authSupported)
- {
- Buffer *helloBuffer = new Buffer;
-
- Header header(MCOP_MAGIC,0,mcopClientHello);
- header.writeType(*helloBuffer);
- ClientHello clientHello(serverID,"md5auth","");
-
- const char *random_cookie = h.authSeed.c_str();
- if(strlen(random_cookie) == 32)
- {
- char *response = arts_md5_auth_mangle(random_cookie);
- clientHello.authData = response;
-#ifdef DEBUG_AUTH
- printf(" got random_cookie = %s\n",random_cookie);
- printf("reply with authData = %s\n",response);
-#endif
- free(response);
- }
- clientHello.writeType(*helloBuffer);
-
- helloBuffer->patchLength();
-
- conn->qSendBuffer(helloBuffer);
- conn->setConnState(Connection::expectAuthAccept);
- return; /* everything ok - leave here */
- }
- else
- {
- cerr << "[mcop dispatcher] error: don't know authentication protocol" << endl;
- cerr << " server offered: ";
- for(ai = h.authProtocols.begin(); ai != h.authProtocols.end(); ai++)
- cerr << *ai << " ";
- cerr << endl;
- }
- }
- break;
-
- case Connection::expectClientHello:
- if(messageType == mcopClientHello)
- {
-#ifdef DEBUG_MESSAGES
- printf("[got ClientHello]\n");
-#endif
- ClientHello c;
- c.readType(*buffer);
- bool valid = (!buffer->readError() && !buffer->remaining());
- delete buffer;
-
- if(valid && (
- (c.authProtocol == "md5auth" && c.authData == conn->cookie())
- || (c.authProtocol == "noauth" && d->allowNoAuthentication) ))
- {
- conn->setServerID(c.serverID);
-
- /* build hints only for the first connection */
- if(!d->accept)
- {
- d->accept = new AuthAccept();
-
- d->accept->hints.push_back(
- "GlobalComm="+d->globalComm.toString());
- d->accept->hints.push_back(
- "InterfaceRepo="+d->interfaceRepo.toString());
- }
-
- Buffer *helloBuffer = new Buffer;
- Header header(MCOP_MAGIC,0,mcopAuthAccept);
- header.writeType(*helloBuffer);
- d->accept->writeType(*helloBuffer);
-
- helloBuffer->patchLength();
- conn->qSendBuffer(helloBuffer);
- conn->setConnState(Connection::established);
-
- return; /* everything ok - leave here */
- }
- }
- break;
-
- case Connection::expectAuthAccept:
- if(messageType == mcopAuthAccept)
- {
-#ifdef DEBUG_MESSAGES
- printf("[got AuthAccept]\n");
-#endif
- AuthAccept a;
- a.readType(*buffer);
- delete buffer;
-#ifdef DEBUG_MESSAGES
-
- vector<string>::iterator hi;
- for(hi = a.hints.begin(); hi != a.hints.end(); hi++)
- cout << "[got ConnectionHint] " << *hi << endl;
-
-#endif
-
- conn->setConnState(Connection::established);
- conn->setHints(a.hints);
- d->serverConnectCondition.wakeAll();
- return; /* everything ok - leave here */
- }
- break;
-
- case Connection::unknown:
- assert(false);
- break;
- }
-
- /*
- * We shouldn't reach this point if everything went all right
- */
- cerr << "[mcop dispatcher] Fatal communication error with a client" << endl;
- if(conn->connState() != Connection::established)
- {
- cerr << "[mcop dispatcher] Authentication of this client was not successful" << endl;
- cerr << "[mcop dispatcher] Connection dropped" << endl;
- conn->drop();
- }
-}
-
-long Dispatcher::addObject(Object_skel *object)
-{
- long objectID = objectPool.allocSlot();
-
- objectPool[objectID] = object;
- return objectID;
-}
-
-void Dispatcher::removeObject(long objectID)
-{
- assert(objectPool[objectID]);
- objectPool.releaseSlot(objectID);
-}
-
-void Dispatcher::generateServerID()
-{
- char *buffer;
- buffer = arts_strdup_printf("%s-%04x-%08lx",
- MCOPUtils::getFullHostname().c_str(),
- getpid(),time(0));
- serverID = buffer;
- free(buffer);
-}
-
-string Dispatcher::objectToString(long objectID)
-{
- Buffer b;
- ObjectReference oref;
-
- oref.serverID = serverID;
- oref.objectID = objectID;
-
- // prefer a unix domainsocket connection over a plain tcp connection
- if(unixServer) oref.urls.push_back(unixServer->url());
- if(tcpServer) oref.urls.push_back(tcpServer->url());
-
- oref.writeType(b);
-
- return b.toString("MCOP-Object");
-}
-
-bool Dispatcher::stringToObjectReference(ObjectReference& r, const string& s)
-{
- if(strncmp(s.c_str(),"global:",7) == 0)
- {
- // if the object reference starts with "global:", it refers to
- // a global object which can be found with the objectManager
-
- string lookup = objectManager->getGlobalReference(&s.c_str()[7]);
- return stringToObjectReference(r,lookup);
- }
-
-
- Buffer b;
- if(!b.fromString(s,"MCOP-Object")) return false;
-
- r.readType(b);
- if(b.readError() || b.remaining()) return false;
-
- return true;
-}
-
-void *Dispatcher::connectObjectLocal(ObjectReference& reference,
- const string& interface)
-{
- if(reference.serverID == serverID)
- {
- void *result = objectPool[reference.objectID]->_cast(interface);
-
- if(result)
- {
- objectPool[reference.objectID]->_copy();
- return result;
- }
- }
-
- return 0;
-}
-
-Connection *Dispatcher::connectObjectRemote(ObjectReference& reference)
-{
- if(reference.serverID == "null") // null reference?
- return 0;
-
- if(reference.serverID == serverID)
- return loopbackConnection();
-
- list<Connection *>::iterator i;
-
- for(i=connections.begin(); i != connections.end();i++)
- {
- Connection *conn = *i;
-
- if(conn->isConnected(reference.serverID))
- {
- // fixme: we should check for the existence of the object
- // and increment a reference count or something like that
- return conn;
- }
- }
-
- /* try to connect the server */
- vector<string>::iterator ui;
- for(ui = reference.urls.begin(); ui != reference.urls.end(); ui++)
- {
- Connection *conn = connectUrl(*ui);
- if(conn)
- {
- if(conn->isConnected(reference.serverID))
- {
- return conn;
- }
- else
- {
- /* we connected somewhere, but not the right server ;) */
- connections.remove(conn);
- conn->_release();
- }
- }
- }
- return 0;
-}
-
-Connection *Dispatcher::connectUrl(const string& url)
-{
- Connection *conn = 0;
- bool isMainThread = SystemThreads::the()->isMainThread();
-
- if(strncmp(url.c_str(),"tcp:",4) == 0)
- {
- conn = new TCPConnection(url);
- }
- else if(strncmp(url.c_str(),"unix:",5) == 0)
- {
- conn = new UnixConnection(url);
- }
-
- if(conn)
- {
- conn->_copy(); // Keep extra ref for when the connection breaks
- conn->setConnState(Connection::expectServerHello);
-
- while((conn->connState() != Connection::established)
- && !conn->broken())
- {
- if(isMainThread)
- _ioManager->processOneEvent(true);
- else
- d->serverConnectCondition.wait(d->mutex);
- }
-
- if(conn->connState() == Connection::established)
- {
- connections.push_back(conn);
- conn->_release(); // Give up extra ref
- return conn;
- }
-
- // well - bad luck (building a connection failed)
-
- // Give up extra ref
- conn->_release();
- }
- return 0;
-}
-
-void Dispatcher::run()
-{
- assert(SystemThreads::the()->isMainThread());
-
- _ioManager->run();
-}
-
-void Dispatcher::terminate()
-{
- _ioManager->terminate();
-}
-
-void Dispatcher::initiateConnection(Connection *connection)
-{
- vector<string> authProtocols;
- authProtocols.push_back("md5auth");
-
- if(d->allowNoAuthentication)
- authProtocols.push_back("noauth");
-
- char *authSeed = arts_md5_auth_mkcookie();
- char *authResult = arts_md5_auth_mangle(authSeed);
-
- Buffer *helloBuffer = new Buffer;
-
- Header header(MCOP_MAGIC,0,mcopServerHello);
- header.writeType(*helloBuffer);
- ServerHello serverHello("aRts/MCOP-1.0.0",serverID,authProtocols,authSeed);
- serverHello.writeType(*helloBuffer);
-
- helloBuffer->patchLength();
-
- connection->qSendBuffer(helloBuffer);
- connection->setConnState(Connection::expectClientHello);
-
- connection->setCookie(authResult);
- free(authSeed);
- free(authResult);
-
- connections.push_back(connection);
-}
-
-void Dispatcher::handleCorrupt(Connection *connection)
-{
- if(connection->connState() != Connection::established)
- {
- cerr << "[mcop dispatcher] Received corrupt message on unauthenticated connection" <<endl;
- cerr << "closing connection." << endl;
- connection->drop();
- d->serverConnectCondition.wakeAll();
- }
- else
- {
- cerr << "[mcop dispatcher] warning: got corrupt MCOP message !??" << endl;
- }
-}
-
-void Dispatcher::handleConnectionClose(Connection *connection)
-{
- /*
- * we can't use enumerate here, because the "existing objects list" might
- * be changing due to the other _disconnectRemote calls we make, so we
- * enumerate() the objects manually
- */
- unsigned long l;
- for(l=0; l<objectPool.max(); l++)
- {
- Object_skel *skel = objectPool[l];
- if(skel) skel->_disconnectRemote(connection);
- }
-
- d->requestResultCondition.wakeAll();
- d->serverConnectCondition.wakeAll();
-
- /*
- * FIXME:
- *
- * there may be error handling to do (e.g., check that the _stub's that
- * still refer to that connection don't crash now).
- */
- connection->_release();
-
- list<Connection *>::iterator i;
- for(i=connections.begin(); i != connections.end();i++)
- {
- if(*i == connection)
- {
- connections.erase(i);
- return;
- }
- }
-}
-
-Connection *Dispatcher::activeConnection()
-{
- return _activeConnection;
-}
-
-Connection *Dispatcher::loopbackConnection()
-{
- return d->loopbackConnection;
-}
-
-DelayedReturn *Dispatcher::delayReturn()
-{
- assert(!d->delayedReturn);
-
- return d->delayedReturn = new DelayedReturn();
-}
-
-Object_skel *Dispatcher::getLocalObject(long objectID)
-{
- Object_skel *result = objectPool[objectID];
-
- if(result) result->_copy();
- return result;
-}
-
-void Dispatcher::lock()
-{
- _instance->d->mutex.lock();
-}
-
-void Dispatcher::unlock()
-{
- _instance->d->mutex.unlock();
-}
-
-void Dispatcher::wakeUp()
-{
- if(SystemThreads::the()->isMainThread()) return;
-
- _instance->d->wakeUpHandler->wakeUp();
-}
-
-/*
-void Dispatcher::reloadTraderData() is declared in trader_impl.cc
-*/