/* Copyright (C) 2000-2002 Stefan Westerfeld stefan@space.twc.de This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "config.h" #include "virtualports.h" #include "startupmanager.h" #include "gslschedule.h" #include "debug.h" #include "asyncschedule.h" #include "audiosubsys.h" #include <gsl/gslcommon.h> #include <gsl/gslengine.h> #include <algorithm> #include <stdio.h> #include <iostream> #include <stack> /* HACK */ class GslMainLoop { protected: std::list<GslClass *> freeClassList; public: GslEngineLoop loop; static bool waitOnTransNeedData; static bool gslDataCalculated; /* static check function */ static gboolean gslCheck(gpointer /* data */, guint /* n_values */, glong* /* timeout_p */, guint /* n_fds */, const GPollFD* /* fds */, gboolean /* revents_filled */) { return waitOnTransNeedData; } /* mainloop integration: initialize (called to get initial loop setup) */ void initialize() { gsl_transact(gsl_job_add_poll (gslCheck, 0, 0, 0, 0), 0); gsl_engine_prepare(&loop); for(unsigned int i = 0; i != loop.n_fds; i++) { printf("TODO: engine fd %d\n",i); } } /* mainloop integration: process (TODO - should be called by IOManager) */ void process() { printf("TODO: mainloop wrapper for fd watches\n"); if(gsl_engine_check(&loop)) gsl_engine_dispatch(); } /* wait for a transaction */ void waitOnTrans() { arts_return_if_fail(waitOnTransNeedData == false); gsl_engine_wait_on_trans(); } /* make the engine calculate something */ void run() { waitOnTransNeedData = true; gslDataCalculated = false; while(!gslDataCalculated && gsl_engine_check(&loop)) gsl_engine_dispatch(); gslDataCalculated = false; waitOnTransNeedData = false; if(!freeClassList.empty()) { /* * make sure that all transactions that are still pending * get finished (especially important in threaded case, * since an entry in the free list doesn't necessarily * mean that the module has entierly been freed) */ waitOnTrans(); std::list<GslClass *>::iterator fi; for(fi = freeClassList.begin(); fi != freeClassList.end(); fi++) free(*fi); freeClassList.clear(); } } void freeGslClass(GslClass *klass) { freeClassList.push_back(klass); } } gslMainLoop; bool GslMainLoop::waitOnTransNeedData = false; bool GslMainLoop::gslDataCalculated = false; namespace Arts { extern void *gslGlobalMutexTable; } using namespace std; using namespace Arts; // ----------- Port ----------- Port::Port(const string& name, void *ptr, long flags, StdScheduleNode* parent) : _name(name), _ptr(ptr), _flags((AttributeType)flags), parent(parent), _dynamicPort(false) { _vport = new VPort(this); } Port::~Port() { if(_vport) delete _vport; } AttributeType Port::flags() { return _flags; } string Port::name() { return _name; } ASyncPort *Port::asyncPort() { return 0; } AudioPort *Port::audioPort() { return 0; } void Port::addAutoDisconnect(Port *source) { autoDisconnect.push_back(source); source->autoDisconnect.push_back(this); } void Port::removeAutoDisconnect(Port *source) { std::list<Port *>::iterator adi; // remove our autodisconnection entry for source port adi = find(autoDisconnect.begin(),autoDisconnect.end(),source); assert(adi != autoDisconnect.end()); autoDisconnect.erase(adi); // remove the source port autodisconnection entry to us adi=find(source->autoDisconnect.begin(),source->autoDisconnect.end(),this); assert(adi != source->autoDisconnect.end()); source->autoDisconnect.erase(adi); } void Port::disconnectAll() { if(_vport) delete _vport; _vport = 0; assert(autoDisconnect.empty()); while(!autoDisconnect.empty()) { Port *other = *autoDisconnect.begin(); // syntax is disconnect(source) if(_flags & streamIn) // if we're incoming, other port is source vport()->disconnect(other->vport()); else // if we're outgoing, we're the source other->vport()->disconnect(this->vport()); } } void Port::setPtr(void *ptr) { _ptr = ptr; } // ------- AudioPort --------- AudioPort::AudioPort(const string& name, void *ptr, long flags,StdScheduleNode *parent) : Port(name,ptr,flags,parent) { destcount = 0; sourcemodule = 0; source = 0; gslIsConstant = false; } AudioPort::~AudioPort() { // } AudioPort *AudioPort::audioPort() { return this; } void AudioPort::setFloatValue(float f) { gslIsConstant = true; gslConstantValue = f; parent->_connectionCountChanged = true; } void AudioPort::connect(Port *psource) { if (source) return; // Error, should not happen (See BR70028) source = psource->audioPort(); assert(source); addAutoDisconnect(psource); source->parent->_connectionCountChanged = parent->_connectionCountChanged = true; source->destcount++; sourcemodule = source->parent; // GSL connect GslTrans *trans = gsl_trans_open(); gsl_trans_add(trans, gsl_job_connect(source->parent->gslModule, source->gslEngineChannel, parent->gslModule, gslEngineChannel)); gsl_trans_commit(trans); } void AudioPort::disconnect(Port *psource) { if (!source || source != psource->audioPort()) return; // Error, should not happen (See BR70028) assert(source); assert(source == psource->audioPort()); removeAutoDisconnect(psource); assert(sourcemodule == source->parent); sourcemodule = 0; source->parent->_connectionCountChanged = parent->_connectionCountChanged = true; source->destcount--; source = 0; // GSL disconnect GslTrans *trans = gsl_trans_open(); gsl_trans_add(trans, gsl_job_disconnect(parent->gslModule, gslEngineChannel)); gsl_trans_commit(trans); } // --------- MultiPort ---------- MultiPort::MultiPort(const string& name, void *ptr, long flags,StdScheduleNode *parent) : Port(name,ptr,flags,parent) { conns = 0; nextID = 0; initConns(); } MultiPort::~MultiPort() { if(conns) { delete[] conns; conns = 0; } } void MultiPort::initConns() { if(conns != 0) delete[] conns; conns = new float_ptr[parts.size() + 1]; conns[parts.size()] = (float *)0; *(float ***)_ptr = conns; long n = 0; std::list<Part>::iterator i; for(i = parts.begin();i != parts.end(); i++) { AudioPort *p = i->dest; p->setPtr((void *)&conns[n++]); } } void MultiPort::connect(Port *port) { AudioPort *dport; char sid[20]; sprintf(sid,"%ld",nextID++); addAutoDisconnect(port); dport = new AudioPort("_"+_name+string(sid),0,streamIn,parent); Part part; part.src = (AudioPort *)port; part.dest = dport; parts.push_back(part); initConns(); parent->addDynamicPort(dport); dport->vport()->connect(port->vport()); } void MultiPort::disconnect(Port *sport) { AudioPort *port = (AudioPort *)sport; removeAutoDisconnect(sport); std::list<Part>::iterator i; for(i = parts.begin(); i != parts.end(); i++) { if(i->src == port) { AudioPort *dport = i->dest; parts.erase(i); initConns(); dport->vport()->disconnect(port->vport()); parent->removeDynamicPort(dport); delete dport; return; } } } // -------- StdScheduleNode --------- void StdScheduleNode::freeConn() { if(inConn) { delete[] inConn; inConn = 0; } if(outConn) { delete[] outConn; outConn = 0; } inConnCount = outConnCount = 0; if(gslModule) { gsl_transact(gsl_job_discard(gslModule),0); gslModule = 0; gslRunning = false; } } void StdScheduleNode::gslProcess(GslModule *module, guint n_values) { StdScheduleNode *node = (StdScheduleNode *)module->user_data; if(!node->running) /* FIXME: need reasonable suspend in the engine */ return; arts_return_if_fail(node->module != 0); GslMainLoop::gslDataCalculated = true; unsigned long j; for(j=0;j<node->inConnCount;j++) { if(node->inConn[j]->gslIsConstant) *((float **)node->inConn[j]->_ptr) = gsl_engine_const_values(node->inConn[j]->gslConstantValue); else *((float **)node->inConn[j]->_ptr) = const_cast<float *>(module->istreams[j].values); } for(j=0;j<node->outConnCount;j++) *((float **)node->outConn[j]->_ptr) = module->ostreams[j].values; node->module->calculateBlock(n_values); } static void gslModuleFree(gpointer /* data */, const GslClass *klass) { gslMainLoop.freeGslClass(const_cast<GslClass *>(klass)); } void StdScheduleNode::rebuildConn() { std::list<Port *>::iterator i; freeConn(); inConnCount = outConnCount = 0; inConn = new AudioPort_ptr[ports.size()]; outConn = new AudioPort_ptr[ports.size()]; for(i=ports.begin();i != ports.end();i++) { AudioPort *p = (*i)->audioPort(); if(p) { if(p->flags() & streamIn) { p->gslEngineChannel = inConnCount; inConn[inConnCount++] = p; } if(p->flags() & streamOut) { p->gslEngineChannel = outConnCount; outConn[outConnCount++] = p; } } } /* create GSL node */ GslClass *gslClass = (GslClass *)calloc(sizeof(GslClass),1); gslClass->n_istreams = inConnCount; gslClass->n_ostreams = outConnCount; gslClass->process = gslProcess; gslClass->free = gslModuleFree; gslModule = gsl_module_new (gslClass, (StdScheduleNode *)this); GslTrans *trans = gsl_trans_open(); gsl_trans_add(trans,gsl_job_integrate(gslModule)); gsl_trans_add(trans,gsl_job_set_consumer(gslModule, running)); gslRunning = running; /* since destroying the old module and creating a new one will destroy * all the connections, we need to restore them here */ unsigned int c; for(c = 0; c < inConnCount; c++) { if(inConn[c]->source) { gsl_trans_add(trans, gsl_job_connect(inConn[c]->source->parent->gslModule, inConn[c]->source->gslEngineChannel, inConn[c]->parent->gslModule, inConn[c]->gslEngineChannel)); } } for(c = 0; c < outConnCount; c++) { std::list<Port *>::iterator ci; for(ci = outConn[c]->autoDisconnect.begin(); ci != outConn[c]->autoDisconnect.end(); ci++) { AudioPort *dest = (*ci)->audioPort(); if( dest ) { gsl_trans_add(trans, gsl_job_connect(outConn[c]->parent->gslModule, outConn[c]->gslEngineChannel, dest->parent->gslModule, dest->gslEngineChannel)); } else { arts_debug( "no audio port: %s for %s", ( *ci )->name().c_str(), _object->_interfaceName().c_str() ); } } } gsl_trans_commit(trans); } Object_skel *StdScheduleNode::object() { return _object; } void *StdScheduleNode::cast(const string &target) { if(target == "StdScheduleNode") return (StdScheduleNode *)this; return 0; } void StdScheduleNode::accessModule() { if(module) return; module = (SynthModule_base *)_object->_cast(Arts::SynthModule_base::_IID); if(!module) arts_warning("Error using interface %s in the flowsystem: only objects" " implementing Arts::SynthModule should carry streams.", _object->_interfaceName().c_str()); } StdScheduleNode::StdScheduleNode(Object_skel *object, StdFlowSystem *flowSystem) : ScheduleNode(object) { _object = object; this->flowSystem = flowSystem; running = false; suspended = false; module = 0; gslModule = 0; gslRunning = false; queryInitStreamFunc = 0; inConn = outConn = 0; inConnCount = outConnCount = 0; } StdScheduleNode::~StdScheduleNode() { /* stop module if still running */ if(running) stop(); /* disconnect all ports */ stack<Port *> disconnect_stack; /* * we must be a bit careful here, as dynamic ports (which are created * for connections by MultiPorts) will suddenly start disappearing, so * we better make a copy of those ports that will stay, and disconnect * them then */ std::list<Port *>::iterator i; for(i=ports.begin();i != ports.end();i++) { if(!(*i)->dynamicPort()) disconnect_stack.push(*i); } while(!disconnect_stack.empty()) { disconnect_stack.top()->disconnectAll(); disconnect_stack.pop(); } /* free them */ for(i=ports.begin();i != ports.end();i++) delete (*i); ports.clear(); freeConn(); } void StdScheduleNode::initStream(const string& name, void *ptr, long flags) { if(flags == -1) { queryInitStreamFunc = (QueryInitStreamFunc)ptr; } else if(flags & streamAsync) { ports.push_back(new ASyncPort(name,ptr,flags,this)); } else if(flags & streamMulti) { ports.push_back(new MultiPort(name,ptr,flags,this)); } else { ports.push_back(new AudioPort(name,ptr,flags,this)); } // TODO: maybe initialize a bit later rebuildConn(); } void StdScheduleNode::addDynamicPort(Port *port) { port->setDynamicPort(); ports.push_back(port); rebuildConn(); } void StdScheduleNode::removeDynamicPort(Port *port) { std::list<Port *>::iterator i; for(i=ports.begin();i!=ports.end();i++) { Port *p = *i; if(p->name() == port->name()) { ports.erase(i); rebuildConn(); return; } } } void StdScheduleNode::start() { assert(!running); running = true; //cout << "start" << endl; accessModule(); module->streamInit(); module->streamStart(); flowSystem->startedChanged(); } void StdScheduleNode::stop() { assert(running); running = false; accessModule(); module->streamEnd(); flowSystem->startedChanged(); } void StdScheduleNode::requireFlow() { // cout << "rf" << module->_interfaceName() << endl; flowSystem->updateStarted(); gslMainLoop.run(); } AutoSuspendState StdScheduleNode::suspendable() { if(running) { accessModule(); return module->autoSuspend(); } // if its not running, who cares? return asSuspend; } void StdScheduleNode::suspend() { if(running) { accessModule(); suspended = true; if((module->autoSuspend() & asSuspendMask) == asSuspendStop) stop(); } } void StdScheduleNode::restart() { if(suspended) { accessModule(); suspended = false; if(!running && (module->autoSuspend() & asSuspendMask) == asSuspendStop) start(); } } Port *StdScheduleNode::findPort(const string& name) { std::list<Port *>::iterator i; for(i=ports.begin();i!=ports.end();i++) { Port *p = *i; if(p->name() == name) return p; } if(queryInitStreamFunc) { if(queryInitStreamFunc(_object,name)) { for(i=ports.begin();i!=ports.end();i++) { Port *p = *i; if(p->name() == name) return p; } } } return 0; } void StdScheduleNode::virtualize(const std::string& port, ScheduleNode *implNode, const std::string& implPort) { StdScheduleNode *impl=(StdScheduleNode *)implNode->cast("StdScheduleNode"); if(impl) { Port *p1 = findPort(port); Port *p2 = impl->findPort(implPort); assert(p1); assert(p2); p1->vport()->virtualize(p2->vport()); } } void StdScheduleNode::devirtualize(const std::string& port, ScheduleNode *implNode, const std::string& implPort) { StdScheduleNode *impl=(StdScheduleNode *)implNode->cast("StdScheduleNode"); if(impl) { Port *p1 = findPort(port); Port *p2 = impl->findPort(implPort); p1->vport()->devirtualize(p2->vport()); } } void StdScheduleNode::connect(const string& port, ScheduleNode *dest, const string& destport) { RemoteScheduleNode *rsn = dest->remoteScheduleNode(); if(rsn) { // RemoteScheduleNodes know better how to connect remotely rsn->connect(destport,this,port); return; } flowSystem->restart(); Port *p1 = findPort(port); Port *p2 = ((StdScheduleNode *)dest)->findPort(destport); if(p1 && p2) { if((p1->flags() & streamIn) && (p2->flags() & streamOut)) { p1->vport()->connect(p2->vport()); } else if((p2->flags() & streamIn) && (p1->flags() & streamOut)) { p2->vport()->connect(p1->vport()); } } } void StdScheduleNode::disconnect(const string& port, ScheduleNode *dest, const string& destport) { RemoteScheduleNode *rsn = dest->remoteScheduleNode(); if(rsn) { // RemoteScheduleNodes know better how to disconnect remotely rsn->disconnect(destport,this,port); return; } flowSystem->restart(); Port *p1 = findPort(port); Port *p2 = ((StdScheduleNode *)dest)->findPort(destport); if(p1 && p2) { if((p1->flags() & streamIn) && (p2->flags() & streamOut)) { p1->vport()->disconnect(p2->vport()); } else if((p2->flags() & streamIn) && (p1->flags() & streamOut)) { p2->vport()->disconnect(p1->vport()); } } } AttributeType StdScheduleNode::queryFlags(const std::string& port) { arts_debug("findPort(%s)", port.c_str()); arts_debug("have %ld ports", ports.size()); Port *p1 = findPort(port); arts_debug("done"); if(p1) { arts_debug("result %d",(long)p1->flags()); return p1->flags(); } arts_debug("failed"); return (AttributeType)0; } void StdScheduleNode::setFloatValue(const string& port, float value) { AudioPort *p = findPort(port)->audioPort(); if(p) { p->vport()->setFloatValue(value); } else { assert(false); } } unsigned long StdScheduleNode::inputConnectionCount(const string& port) { unsigned long result = 0; unsigned int c; for(c = 0; c < inConnCount; c++) { if(inConn[c]->name() == port) { if(inConn[c]->source || inConn[c]->gslIsConstant) result++; } } return result; } unsigned long StdScheduleNode::outputConnectionCount(const string& port) { unsigned long result = 0; unsigned int c; for(c = 0; c < outConnCount; c++) { if(outConn[c]->name() == port) result += outConn[c]->destcount; } return result; } StdFlowSystem::StdFlowSystem() { _suspended = false; needUpdateStarted = false; /* TODO: correct parameters */ static bool gsl_is_initialized = false; if(!gsl_is_initialized) { GslConfigValue values[3] = { { "wave_chunk_padding", 8 }, { "dcache_block_size", 4000, }, { 0, 0 } }; gsl_is_initialized = true; if (!g_thread_supported ()) g_thread_init(0); gsl_init(values, (GslMutexTable *)gslGlobalMutexTable); /* * FIXME: both of these are really supposed to be tunable * - the 512 because of low-latency apps, where calculating smaller * block sizes might be necessary * - the 44100 because of the obvious reason, that not every artsd * is running at that rate */ gsl_engine_init(false, 512, 44100, /* subsamplemask */ 63); if(gslGlobalMutexTable) arts_debug("gsl: using Unix98 pthreads directly for mutexes and conditions"); /*gsl_engine_debug_enable(GslEngineDebugLevel(GSL_ENGINE_DEBUG_JOBS | GSL_ENGINE_DEBUG_SCHED));*/ } gslMainLoop.initialize(); } ScheduleNode *StdFlowSystem::addObject(Object_skel *object) { // do not add new modules when being in suspended state restart(); StdScheduleNode *node = new StdScheduleNode(object,this); nodes.push_back(node); return node; } void StdFlowSystem::removeObject(ScheduleNode *node) { StdScheduleNode *xnode = (StdScheduleNode *)node->cast("StdScheduleNode"); assert(xnode); nodes.remove(xnode); delete xnode; } bool StdFlowSystem::suspended() { return _suspended; } bool StdFlowSystem::suspendable() { /* * What it does: * ------------- * * The suspension algorithm will first divide the graph of modules into * subgraphs of interconnected modules. A subgraph is suspendable if * all of its modules are suspendable and the subgraph does not contain * producer(s) and consumer(s) at the same time. * * Finally, our module graph is suspendable if all its subgraphs are. * * How it is implemented: * ---------------------- * * For efficiency reasons, both steps are merged together. First all * modules will be marked as unseen. Then a module is picked and * all modules that it connects to are recursively added to the * subgraph. */ /* * initialization: no nodes are seen */ std::list<StdScheduleNode *>::iterator i; for (i = nodes.begin(); i != nodes.end(); i++) { StdScheduleNode *node = *i; node->suspendTag = false; } stack<StdScheduleNode*> todo; for(i = nodes.begin(); i != nodes.end(); i++) { bool haveConsumer = false; bool haveProducer = false; /* * examine the subgraph consisting of all nodes connected to (*i) * (only will do anything if suspendTag is not already set) */ todo.push(*i); do { StdScheduleNode *node = todo.top(); todo.pop(); if(!node->suspendTag) { node->suspendTag = true; // never examine this node again switch (node->suspendable()) { case asNoSuspend|asProducer: case asNoSuspend|asConsumer: case asNoSuspend: return false; break; case asSuspend: case asSuspendStop: /* nothing */ break; case asSuspend|asProducer: case asSuspendStop|asProducer: if(haveConsumer) return false; else haveProducer = true; break; case asSuspend|asConsumer: case asSuspendStop|asConsumer: if(haveProducer) return false; else haveConsumer = true; break; default: arts_fatal("bad suspend value %d", node->suspendable()); } unsigned int c; for(c = 0; c < node->inConnCount; c++) { if(node->inConn[c]->source) todo.push(node->inConn[c]->source->parent); } for(c = 0; c < node->outConnCount; c++) { std::list<Port *>::iterator ci; for(ci = node->outConn[c]->autoDisconnect.begin(); ci != node->outConn[c]->autoDisconnect.end(); ci++) { AudioPort *dest = (*ci)->audioPort(); if(dest) todo.push(dest->parent); } } } } while(!todo.empty()); } return true; } void StdFlowSystem::suspend() { if(!_suspended) { std::list<StdScheduleNode *>::iterator i; for(i = nodes.begin();i != nodes.end();i++) { StdScheduleNode *node = *i; node->suspend(); } _suspended = true; } } void StdFlowSystem::restart() { if(_suspended) { std::list<StdScheduleNode *>::iterator i; for(i = nodes.begin();i != nodes.end();i++) { StdScheduleNode *node = *i; node->restart(); } _suspended = false; } } /* remote accessibility */ void StdFlowSystem::startObject(Object node) { StdScheduleNode *sn = (StdScheduleNode *)node._node()->cast("StdScheduleNode"); sn->start(); } void StdFlowSystem::stopObject(Object node) { StdScheduleNode *sn = (StdScheduleNode *)node._node()->cast("StdScheduleNode"); sn->stop(); } void StdFlowSystem::connectObject(Object sourceObject,const string& sourcePort, Object destObject, const std::string& destPort) { arts_debug("connect port %s to %s", sourcePort.c_str(), destPort.c_str()); StdScheduleNode *sn = (StdScheduleNode *)sourceObject._node()->cast("StdScheduleNode"); assert(sn); Port *port = sn->findPort(sourcePort); assert(port); StdScheduleNode *destsn = (StdScheduleNode *)destObject._node()->cast("StdScheduleNode"); if(destsn) { sn->connect(sourcePort,destsn,destPort); return; } ASyncPort *ap = port->asyncPort(); if(ap) { FlowSystemSender sender; FlowSystemReceiver receiver; FlowSystem remoteFs; string dest = destObject.toString() + ":" + destPort; ASyncNetSend *netsend = new ASyncNetSend(ap, dest); sender = FlowSystemSender::_from_base(netsend); // don't release netsend remoteFs = destObject._flowSystem(); receiver = remoteFs.createReceiver(destObject, destPort, sender); netsend->setReceiver(receiver); arts_debug("connected an asyncnetsend"); } } void StdFlowSystem::disconnectObject(Object sourceObject, const string& sourcePort, Object destObject, const std::string& destPort) { arts_debug("disconnect port %s and %s",sourcePort.c_str(),destPort.c_str()); StdScheduleNode *sn = (StdScheduleNode *)sourceObject._node()->cast("StdScheduleNode"); assert(sn); Port *port = sn->findPort(sourcePort); assert(port); StdScheduleNode *destsn = (StdScheduleNode *)destObject._node()->cast("StdScheduleNode"); if(destsn) { sn->disconnect(sourcePort,destsn,destPort); return; } ASyncPort *ap = port->asyncPort(); if(ap) { string dest = destObject.toString() + ":" + destPort; ap->disconnectRemote(dest); arts_debug("disconnected an asyncnetsend"); } } AttributeType StdFlowSystem::queryFlags(Object node, const std::string& port) { StdScheduleNode *sn = (StdScheduleNode *)node._node()->cast("StdScheduleNode"); assert(sn); return sn->queryFlags(port); } void StdFlowSystem::setFloatValue(Object node, const std::string& port, float value) { StdScheduleNode *sn = (StdScheduleNode *)node._node()->cast("StdScheduleNode"); assert(sn); sn->setFloatValue(port,value); } FlowSystemReceiver StdFlowSystem::createReceiver(Object object, const string &port, FlowSystemSender sender) { StdScheduleNode *sn = (StdScheduleNode *)object._node()->cast("StdScheduleNode"); Port *p = sn->findPort(port); assert(p); ASyncPort *ap = p->asyncPort(); if(ap) { arts_debug("creating packet receiver"); return FlowSystemReceiver::_from_base(new ASyncNetReceive(ap, sender)); } return FlowSystemReceiver::null(); } void StdFlowSystem::updateStarted() { if(!needUpdateStarted) return; needUpdateStarted = false; std::list<StdScheduleNode*>::iterator ni; GslTrans *trans = 0; for(ni = nodes.begin(); ni != nodes.end(); ni++) { StdScheduleNode *node = *ni; if(node->running != node->gslRunning) { if(!trans) trans = gsl_trans_open(); gsl_trans_add(trans, gsl_job_set_consumer(node->gslModule, node->running)); node->gslRunning = node->running; } } if(trans) gsl_trans_commit(trans); } void StdFlowSystem::startedChanged() { needUpdateStarted = true; } // hacked initialization of Dispatcher::the()->flowSystem ;) namespace Arts { static class SetFlowSystem : public StartupClass { FlowSystem_impl *fs; public: void startup() { fs = new StdFlowSystem; Dispatcher::the()->setFlowSystem(fs); } void shutdown() { fs->_release(); } } sfs; }