/*

    Copyright (C) 2000-2002 Stefan Westerfeld
                            stefan@space.twc.de

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Library General Public
    License as published by the Free Software Foundation; either
    version 2 of the License, or (at your option) any later version.

    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Library General Public License for more details.

    You should have received a copy of the GNU Library General Public License
    along with this library; see the file COPYING.LIB.  If not, write to
    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
    Boston, MA 02110-1301, USA.

    */

#include "config.h"

#include "virtualports.h"
#include "startupmanager.h"
#include "gslschedule.h"
#include "debug.h"
#include "asyncschedule.h"
#include "audiosubsys.h"
#include <gsl/gslcommon.h>
#include <gsl/gslengine.h>
#include <algorithm>
#include <stdio.h>
#include <iostream>
#include <stack>

/* HACK */
class GslMainLoop {
protected:
	std::list<GslClass *> freeClassList;

public:
	GslEngineLoop loop;

	static bool waitOnTransNeedData;
	static bool gslDataCalculated;

	/* static check function */
	static gboolean gslCheck(gpointer /* data */, guint /* n_values */,
							 glong*  /* timeout_p */,
                         	 guint /* n_fds */, const GPollFD* /* fds */,
						 	 gboolean /* revents_filled */)
	{
		return waitOnTransNeedData;
	}
	/* mainloop integration: initialize (called to get initial loop setup) */
	void initialize()
	{
		gsl_transact(gsl_job_add_poll (gslCheck, 0, 0, 0, 0), 0);
		gsl_engine_prepare(&loop);

		for(unsigned int i = 0; i != loop.n_fds; i++)
		{
			printf("TODO: engine fd %d\n",i);
		}
	}
	/* mainloop integration: process (TODO - should be called by IOManager) */
	void process()
	{
		printf("TODO: mainloop wrapper for fd watches\n");
		if(gsl_engine_check(&loop))
			gsl_engine_dispatch();
	}
	/* wait for a transaction */
	void waitOnTrans()
	{
		arts_return_if_fail(waitOnTransNeedData == false);
		gsl_engine_wait_on_trans();
	}
	/* make the engine calculate something */
	void run()
	{
		waitOnTransNeedData = true;
		gslDataCalculated = false;

		while(!gslDataCalculated && gsl_engine_check(&loop))
			gsl_engine_dispatch();

		gslDataCalculated = false;
		waitOnTransNeedData = false;

		if(!freeClassList.empty())
		{
			/*
			 * make sure that all transactions that are still pending
			 * get finished (especially important in threaded case,
			 * since an entry in the free list doesn't necessarily
			 * mean that the module has entierly been freed)
			 */
			waitOnTrans();

			std::list<GslClass *>::iterator fi;
			for(fi = freeClassList.begin(); fi != freeClassList.end(); fi++)
				free(*fi);

			freeClassList.clear();
		}
	}
	void freeGslClass(GslClass *klass)
	{
		freeClassList.push_back(klass);
	}
} gslMainLoop;

bool GslMainLoop::waitOnTransNeedData = false;
bool GslMainLoop::gslDataCalculated = false;
namespace Arts { extern void *gslGlobalMutexTable; }


using namespace std;
using namespace Arts;

// ----------- Port -----------

Port::Port(const string& name, void *ptr, long flags, StdScheduleNode* parent)
	: _name(name), _ptr(ptr), _flags((AttributeType)flags),
	  parent(parent), _dynamicPort(false)
{
	_vport = new VPort(this);
}

Port::~Port()
{
	if(_vport)
		delete _vport;
}

AttributeType Port::flags()
{
	return _flags;
}

string Port::name()
{
	return _name;
}

ASyncPort *Port::asyncPort()
{
	return 0;
}

AudioPort *Port::audioPort()
{
	return 0;
}

void Port::addAutoDisconnect(Port *source)
{
	autoDisconnect.push_back(source);
	source->autoDisconnect.push_back(this);
}

void Port::removeAutoDisconnect(Port *source)
{
	std::list<Port *>::iterator adi;

	// remove our autodisconnection entry for source port
	adi = find(autoDisconnect.begin(),autoDisconnect.end(),source);
	assert(adi != autoDisconnect.end());
	autoDisconnect.erase(adi);

	// remove the source port autodisconnection entry to us
	adi=find(source->autoDisconnect.begin(),source->autoDisconnect.end(),this);
	assert(adi != source->autoDisconnect.end());
	source->autoDisconnect.erase(adi);
}

void Port::disconnectAll()
{
	if(_vport)
		delete _vport;
	_vport = 0;
	assert(autoDisconnect.empty());
	while(!autoDisconnect.empty())
	{
		Port *other = *autoDisconnect.begin();

		// syntax is disconnect(source)
		if(_flags & streamIn)
			// if we're incoming, other port is source
			vport()->disconnect(other->vport());
		else
			// if we're outgoing, we're the source
			other->vport()->disconnect(this->vport());
	}
}

void Port::setPtr(void *ptr)
{
	_ptr = ptr;
}

// ------- AudioPort ---------

AudioPort::AudioPort(const string& name,
		             void *ptr, long flags,StdScheduleNode *parent)
		: Port(name,ptr,flags,parent)
{
	destcount = 0;
	sourcemodule = 0;
	source = 0;
	gslIsConstant = false;
}

AudioPort::~AudioPort()
{
	//
}

AudioPort *AudioPort::audioPort()
{
	return this;
}

void AudioPort::setFloatValue(float f)
{
	gslIsConstant = true;
	gslConstantValue = f;

	parent->_connectionCountChanged = true;
}

void AudioPort::connect(Port *psource)
{
	if (source) return; // Error, should not happen (See BR70028)
	source = psource->audioPort();
	assert(source);
	addAutoDisconnect(psource);

	source->parent->_connectionCountChanged = parent->_connectionCountChanged = true;
	source->destcount++;
	sourcemodule = source->parent;

	// GSL connect
	GslTrans *trans = gsl_trans_open();
	gsl_trans_add(trans, gsl_job_connect(source->parent->gslModule,
										 source->gslEngineChannel,
										 parent->gslModule,
										 gslEngineChannel));
	gsl_trans_commit(trans);
}

void AudioPort::disconnect(Port *psource)
{
	if (!source || source != psource->audioPort()) return; // Error, should not happen (See BR70028)
	assert(source);
	assert(source == psource->audioPort());
	removeAutoDisconnect(psource);

	assert(sourcemodule == source->parent);
	sourcemodule = 0;

	source->parent->_connectionCountChanged = parent->_connectionCountChanged = true;
	source->destcount--;
	source = 0;

	// GSL disconnect
	GslTrans *trans = gsl_trans_open();
	gsl_trans_add(trans, gsl_job_disconnect(parent->gslModule,
										    gslEngineChannel));
	gsl_trans_commit(trans);
}

// --------- MultiPort ----------

MultiPort::MultiPort(const string& name,
		             void *ptr, long flags,StdScheduleNode *parent)
	: Port(name,ptr,flags,parent)
{
	conns = 0;
	nextID = 0;
	initConns();
}

MultiPort::~MultiPort()
{
	if(conns)
	{
		delete[] conns;
		conns = 0;
	}
}

void MultiPort::initConns()
{
	if(conns != 0) delete[] conns;
	conns = new float_ptr[parts.size() + 1];
	conns[parts.size()] = (float *)0;

	*(float ***)_ptr = conns;

	long n = 0;
	std::list<Part>::iterator i;
	for(i = parts.begin();i != parts.end(); i++)
	{
		AudioPort *p = i->dest;
		p->setPtr((void *)&conns[n++]);
	}
}

void MultiPort::connect(Port *port)
{
	AudioPort *dport;
	char sid[20];
	sprintf(sid,"%ld",nextID++);

	addAutoDisconnect(port);

	dport = new AudioPort("_"+_name+string(sid),0,streamIn,parent);

	Part part;
	part.src = (AudioPort *)port;
	part.dest = dport;

	parts.push_back(part);
	initConns();

	parent->addDynamicPort(dport);
	dport->vport()->connect(port->vport());
}

void MultiPort::disconnect(Port *sport)
{
	AudioPort *port = (AudioPort *)sport;
	removeAutoDisconnect(sport);

	std::list<Part>::iterator i;
	for(i = parts.begin(); i != parts.end(); i++)
	{
		if(i->src == port)
		{
			AudioPort *dport = i->dest;
			parts.erase(i);
			initConns();

			dport->vport()->disconnect(port->vport());
			parent->removeDynamicPort(dport);

			delete dport;
			return;
		}
	}
}

// -------- StdScheduleNode ---------

void StdScheduleNode::freeConn()
{
	if(inConn)
	{
		delete[] inConn;
		inConn = 0;
	}
	if(outConn)
	{
		delete[] outConn;
		outConn = 0;
	}
	inConnCount = outConnCount = 0;

	if(gslModule)
	{
		gsl_transact(gsl_job_discard(gslModule),0);

		gslModule = 0;
		gslRunning = false;
	}
}

void StdScheduleNode::gslProcess(GslModule *module, guint n_values)
{
	StdScheduleNode *node = (StdScheduleNode *)module->user_data;
	if(!node->running)		/* FIXME: need reasonable suspend in the engine */
		return;

	arts_return_if_fail(node->module != 0);

	GslMainLoop::gslDataCalculated = true;

	unsigned long j;
	for(j=0;j<node->inConnCount;j++)
	{
		if(node->inConn[j]->gslIsConstant)
			*((float **)node->inConn[j]->_ptr) =
				gsl_engine_const_values(node->inConn[j]->gslConstantValue);
		else
			*((float **)node->inConn[j]->_ptr) = const_cast<float *>(module->istreams[j].values);
	}

	for(j=0;j<node->outConnCount;j++)
		*((float **)node->outConn[j]->_ptr) = module->ostreams[j].values;

	node->module->calculateBlock(n_values);
}

static void gslModuleFree(gpointer /* data */, const GslClass *klass)
{
	gslMainLoop.freeGslClass(const_cast<GslClass *>(klass));
}

void StdScheduleNode::rebuildConn()
{
	std::list<Port *>::iterator i;

	freeConn();

	inConnCount = outConnCount = 0;
	inConn = new AudioPort_ptr[ports.size()];
	outConn = new AudioPort_ptr[ports.size()];

	for(i=ports.begin();i != ports.end();i++)
	{
		AudioPort *p = (*i)->audioPort();
		if(p)
		{
			if(p->flags() & streamIn)
			{
				p->gslEngineChannel = inConnCount;
				inConn[inConnCount++] = p;
			}
			if(p->flags() & streamOut)
			{
				p->gslEngineChannel = outConnCount;
				outConn[outConnCount++] = p;
			}
		}
	}

	/* create GSL node */
	GslClass *gslClass = (GslClass *)calloc(sizeof(GslClass),1);
	gslClass->n_istreams = inConnCount;
	gslClass->n_ostreams = outConnCount;
	gslClass->process = gslProcess;
	gslClass->free = gslModuleFree;

	gslModule = gsl_module_new (gslClass, (StdScheduleNode *)this);

	GslTrans *trans = gsl_trans_open();
	gsl_trans_add(trans,gsl_job_integrate(gslModule));
	gsl_trans_add(trans,gsl_job_set_consumer(gslModule, running));
	gslRunning = running;

	/* since destroying the old module and creating a new one will destroy
	 * all the connections, we need to restore them here
	 */
	unsigned int c;
	for(c = 0; c < inConnCount; c++)
	{
		if(inConn[c]->source)
		{
			gsl_trans_add(trans,
				gsl_job_connect(inConn[c]->source->parent->gslModule,
						 	 	inConn[c]->source->gslEngineChannel,
								inConn[c]->parent->gslModule,
								inConn[c]->gslEngineChannel));
		}
	}
	for(c = 0; c < outConnCount; c++)
	{
		std::list<Port *>::iterator ci;

		for(ci = outConn[c]->autoDisconnect.begin();
			ci != outConn[c]->autoDisconnect.end(); ci++)
		{
			AudioPort *dest = (*ci)->audioPort();
			if( dest )
			{
				gsl_trans_add(trans,
					gsl_job_connect(outConn[c]->parent->gslModule,
									outConn[c]->gslEngineChannel,
									dest->parent->gslModule,
									dest->gslEngineChannel));
			}
			else
			{
				arts_debug( "no audio port: %s for %s", ( *ci )->name().c_str(), _object->_interfaceName().c_str() );
			}
		}
	}
	gsl_trans_commit(trans);
}

Object_skel *StdScheduleNode::object()
{
	return _object;
}

void *StdScheduleNode::cast(const string &target)
{
	if(target == "StdScheduleNode") return (StdScheduleNode *)this;
	return 0;
}


void StdScheduleNode::accessModule()
{
	if(module) return;

	module = (SynthModule_base *)_object->_cast(Arts::SynthModule_base::_IID);
	if(!module)
		arts_warning("Error using interface %s in the flowsystem: only objects"
					 " implementing Arts::SynthModule should carry streams.",
					 _object->_interfaceName().c_str());
}

StdScheduleNode::StdScheduleNode(Object_skel *object, StdFlowSystem *flowSystem) : ScheduleNode(object)
{
	_object = object;
	this->flowSystem = flowSystem;
	running = false;
	suspended = false;
	module = 0;
	gslModule = 0;
	gslRunning = false;
	queryInitStreamFunc = 0;
	inConn = outConn = 0;
	inConnCount = outConnCount = 0;
}

StdScheduleNode::~StdScheduleNode()
{
	/* stop module if still running */
	if(running) stop();
	/* disconnect all ports */
	stack<Port *> disconnect_stack;

	/*
	 * we must be a bit careful here, as dynamic ports (which are created
	 * for connections by MultiPorts) will suddenly start disappearing, so
	 * we better make a copy of those ports that will stay, and disconnect
	 * them then
	 */
	std::list<Port *>::iterator i;
	for(i=ports.begin();i != ports.end();i++)
	{
		if(!(*i)->dynamicPort()) disconnect_stack.push(*i);
	}

	while(!disconnect_stack.empty())
	{
		disconnect_stack.top()->disconnectAll();
		disconnect_stack.pop();
	}
	/* free them */
	for(i=ports.begin();i != ports.end();i++)
		delete (*i);
	ports.clear();

	freeConn();
}

void StdScheduleNode::initStream(const string& name, void *ptr, long flags)
{
	if(flags == -1)
	{
		queryInitStreamFunc = (QueryInitStreamFunc)ptr;
	}
	else if(flags & streamAsync)
	{
		ports.push_back(new ASyncPort(name,ptr,flags,this));
	}
	else if(flags & streamMulti)
	{
		ports.push_back(new MultiPort(name,ptr,flags,this));
	}
	else
	{
		ports.push_back(new AudioPort(name,ptr,flags,this));
	}

	// TODO: maybe initialize a bit later
	rebuildConn();
}

void StdScheduleNode::addDynamicPort(Port *port)
{
	port->setDynamicPort();
	ports.push_back(port);
	rebuildConn();
}

void StdScheduleNode::removeDynamicPort(Port *port)
{
	std::list<Port *>::iterator i;
	for(i=ports.begin();i!=ports.end();i++)
	{
		Port *p = *i;
		if(p->name() == port->name())
		{
			ports.erase(i);
			rebuildConn();
			return;
		}
	}
}

void StdScheduleNode::start()
{
	assert(!running);
	running = true;

	//cout << "start" << endl;
	accessModule();
	module->streamInit();
	module->streamStart();
	flowSystem->startedChanged();
}

void StdScheduleNode::stop()
{
	assert(running);
	running = false;

	accessModule();
	module->streamEnd();
	flowSystem->startedChanged();
}

void StdScheduleNode::requireFlow()
{
	// cout << "rf" << module->_interfaceName() << endl;
	flowSystem->updateStarted();
	gslMainLoop.run();
}

AutoSuspendState StdScheduleNode::suspendable()
{
	if(running) {
		accessModule();
		return module->autoSuspend();
	}
	// if its not running, who cares?
	return asSuspend;
}

void StdScheduleNode::suspend()
{
	if(running) {
		accessModule();
		suspended = true;
		if((module->autoSuspend() & asSuspendMask) == asSuspendStop) stop();
	}
}

void StdScheduleNode::restart()
{
	if(suspended) {
		accessModule();
		suspended = false;
		if(!running && (module->autoSuspend() & asSuspendMask) == asSuspendStop) start();
	}
}

Port *StdScheduleNode::findPort(const string& name)
{
	std::list<Port *>::iterator i;
	for(i=ports.begin();i!=ports.end();i++)
	{
		Port *p = *i;
		if(p->name() == name) return p;
	}
	if(queryInitStreamFunc)
	{
		if(queryInitStreamFunc(_object,name))
		{
			for(i=ports.begin();i!=ports.end();i++)
			{
				Port *p = *i;
				if(p->name() == name) return p;
			}
		}
	}
	return 0;
}

void StdScheduleNode::virtualize(const std::string& port,
		                         ScheduleNode *implNode,
								 const std::string& implPort)
{
	StdScheduleNode *impl=(StdScheduleNode *)implNode->cast("StdScheduleNode");
	if(impl)
	{
		Port *p1 = findPort(port);
		Port *p2 = impl->findPort(implPort);

		assert(p1);
		assert(p2);
		p1->vport()->virtualize(p2->vport());
	}
}

void StdScheduleNode::devirtualize(const std::string& port,
		                           ScheduleNode *implNode,
								   const std::string& implPort)
{
	StdScheduleNode *impl=(StdScheduleNode *)implNode->cast("StdScheduleNode");
	if(impl)
	{
		Port *p1 = findPort(port);
		Port *p2 = impl->findPort(implPort);

		p1->vport()->devirtualize(p2->vport());
	}
}

void StdScheduleNode::connect(const string& port, ScheduleNode *dest,
		                                          const string& destport)
{
	RemoteScheduleNode *rsn = dest->remoteScheduleNode();
	if(rsn)
	{
		// RemoteScheduleNodes know better how to connect remotely
		rsn->connect(destport,this,port);
		return;
	}

	flowSystem->restart();

	Port *p1 = findPort(port);
	Port *p2 = ((StdScheduleNode *)dest)->findPort(destport);

	if(p1 && p2)
	{
		if((p1->flags() & streamIn) && (p2->flags() & streamOut))
		{
			p1->vport()->connect(p2->vport());
		}
		else if((p2->flags() & streamIn) && (p1->flags() & streamOut))
		{
			p2->vport()->connect(p1->vport());
		}
	}
}

void StdScheduleNode::disconnect(const string& port, ScheduleNode *dest,
		                                             const string& destport)
{
	RemoteScheduleNode *rsn = dest->remoteScheduleNode();
	if(rsn)
	{
		// RemoteScheduleNodes know better how to disconnect remotely
		rsn->disconnect(destport,this,port);
		return;
	}

	flowSystem->restart();

	Port *p1 = findPort(port);
	Port *p2 = ((StdScheduleNode *)dest)->findPort(destport);

	if(p1 && p2)
	{
		if((p1->flags() & streamIn) && (p2->flags() & streamOut))
		{
			p1->vport()->disconnect(p2->vport());
		}
		else if((p2->flags() & streamIn) && (p1->flags() & streamOut))
		{
			p2->vport()->disconnect(p1->vport());
		}
	}
}

AttributeType StdScheduleNode::queryFlags(const std::string& port)
{
	arts_debug("findPort(%s)", port.c_str());
	arts_debug("have %ld ports", ports.size());
	Port *p1 = findPort(port);
	arts_debug("done");

	if(p1)
	{
		arts_debug("result %d",(long)p1->flags());
		return p1->flags();
	}
	arts_debug("failed");
	return (AttributeType)0;
}

void StdScheduleNode::setFloatValue(const string& port, float value)
{
	AudioPort *p = findPort(port)->audioPort();

	if(p) {
		p->vport()->setFloatValue(value);
	} else {
		assert(false);
	}
}

unsigned long StdScheduleNode::inputConnectionCount(const string& port)
{
	unsigned long result = 0;

	unsigned int c;
	for(c = 0; c < inConnCount; c++)
	{
		if(inConn[c]->name() == port)
		{
			if(inConn[c]->source || inConn[c]->gslIsConstant)
				result++;
		}
	}

	return result;
}

unsigned long StdScheduleNode::outputConnectionCount(const string& port)
{
	unsigned long result = 0;

	unsigned int c;
	for(c = 0; c < outConnCount; c++)
	{
		if(outConn[c]->name() == port)
			result += outConn[c]->destcount;
	}

	return result;
}

StdFlowSystem::StdFlowSystem()
{
	_suspended = false;
	needUpdateStarted = false;

	/* TODO: correct parameters */
	static bool gsl_is_initialized = false;
	if(!gsl_is_initialized)
	{
		GslConfigValue values[3] = {
			{ "wave_chunk_padding", 8 },
			{ "dcache_block_size",  4000, },
			{ 0, 0 }
		};
		gsl_is_initialized = true;

		if (!g_thread_supported ())
			g_thread_init(0);
		gsl_init(values, (GslMutexTable *)gslGlobalMutexTable);

		/*
		 * FIXME: both of these are really supposed to be tunable
		 * - the 512 because of low-latency apps, where calculating smaller
		 *   block sizes might be necessary
		 * - the 44100 because of the obvious reason, that not every artsd
		 *   is running at that rate
		 */
		gsl_engine_init(false, 512, 44100, /* subsamplemask */ 63);
		if(gslGlobalMutexTable)
			arts_debug("gsl: using Unix98 pthreads directly for mutexes and conditions");
		/*gsl_engine_debug_enable(GslEngineDebugLevel(GSL_ENGINE_DEBUG_JOBS | GSL_ENGINE_DEBUG_SCHED));*/
	}
	gslMainLoop.initialize();
}

ScheduleNode *StdFlowSystem::addObject(Object_skel *object)
{
	// do not add new modules when being in suspended state
	restart();

	StdScheduleNode *node = new StdScheduleNode(object,this);
	nodes.push_back(node);
	return node;
}

void StdFlowSystem::removeObject(ScheduleNode *node)
{
	StdScheduleNode *xnode = (StdScheduleNode *)node->cast("StdScheduleNode");
	assert(xnode);
	nodes.remove(xnode);
	delete xnode;
}

bool StdFlowSystem::suspended()
{
	return _suspended;
}

bool StdFlowSystem::suspendable()
{
	/*
	 * What it does:
	 * -------------
	 *
	 * The suspension algorithm will first divide the graph of modules into
	 * subgraphs of interconnected modules. A subgraph is suspendable if
	 * all of its modules are suspendable and the subgraph does not contain
	 * producer(s) and consumer(s) at the same time.
	 *
	 * Finally, our module graph is suspendable if all its subgraphs are.
	 *
	 * How it is implemented:
	 * ----------------------
	 *
	 * For efficiency reasons, both steps are merged together. First all
	 * modules will be marked as unseen. Then a module is picked and
	 * all modules that it connects to are recursively added to the
	 * subgraph.
	 */

	/*
	 * initialization: no nodes are seen
	 */
	std::list<StdScheduleNode *>::iterator i;
	for (i = nodes.begin(); i != nodes.end(); i++)
	{
		StdScheduleNode *node = *i;
		node->suspendTag = false;
	}

	stack<StdScheduleNode*> todo;
	for(i = nodes.begin(); i != nodes.end(); i++)
	{
		bool haveConsumer = false;
		bool haveProducer = false;

		/*
		 * examine the subgraph consisting of all nodes connected to (*i)
		 * (only will do anything if suspendTag is not already set)
		 */

		todo.push(*i);
		do
		{
			StdScheduleNode *node = todo.top();
			todo.pop();

			if(!node->suspendTag)
			{
				node->suspendTag = true;	// never examine this node again

				switch (node->suspendable())
				{
					case asNoSuspend|asProducer:
					case asNoSuspend|asConsumer:
					case asNoSuspend:
							return false;
						break;
					case asSuspend:
					case asSuspendStop:	
							/* nothing */
						break;
					case asSuspend|asProducer:
					case asSuspendStop|asProducer:
							if(haveConsumer)
								return false;
							else
								haveProducer = true;
						break;
					case asSuspend|asConsumer:
					case asSuspendStop|asConsumer:
							if(haveProducer)
								return false;
							else
								haveConsumer = true;
						break;
					default:
						arts_fatal("bad suspend value %d", node->suspendable());
				}

				unsigned int c;
				for(c = 0; c < node->inConnCount; c++)
				{
					if(node->inConn[c]->source)
						todo.push(node->inConn[c]->source->parent);
				}
	
				for(c = 0; c < node->outConnCount; c++)
				{
					std::list<Port *>::iterator ci;

					for(ci = node->outConn[c]->autoDisconnect.begin();
						ci != node->outConn[c]->autoDisconnect.end(); ci++)
					{
						AudioPort *dest = (*ci)->audioPort();
						if(dest)
							todo.push(dest->parent);
					}
				}
			}
		} while(!todo.empty());
	}
	return true;
}

void StdFlowSystem::suspend()
{
	if(!_suspended)
	{
		std::list<StdScheduleNode *>::iterator i;
		for(i = nodes.begin();i != nodes.end();i++)
		{
			StdScheduleNode *node = *i;
			node->suspend();
		}
		_suspended = true;
	}
}

void StdFlowSystem::restart()
{
	if(_suspended)
	{
		std::list<StdScheduleNode *>::iterator i;
		for(i = nodes.begin();i != nodes.end();i++)
		{
			StdScheduleNode *node = *i;
			node->restart();
		}
		_suspended = false;
	}
}

/* remote accessibility */

void StdFlowSystem::startObject(Object node)
{
	StdScheduleNode *sn =
		(StdScheduleNode *)node._node()->cast("StdScheduleNode");
	sn->start();
}

void StdFlowSystem::stopObject(Object node)
{
	StdScheduleNode *sn =
		(StdScheduleNode *)node._node()->cast("StdScheduleNode");
	sn->stop();
}

void StdFlowSystem::connectObject(Object sourceObject,const string& sourcePort,
					Object destObject, const std::string& destPort)
{
	arts_debug("connect port %s to %s", sourcePort.c_str(), destPort.c_str());
	StdScheduleNode *sn =
		(StdScheduleNode *)sourceObject._node()->cast("StdScheduleNode");
	assert(sn);

	Port *port = sn->findPort(sourcePort);
	assert(port);

	StdScheduleNode *destsn =
		(StdScheduleNode *)destObject._node()->cast("StdScheduleNode");
	if(destsn)
	{
		sn->connect(sourcePort,destsn,destPort);
		return;
	}

	ASyncPort *ap = port->asyncPort();

	if(ap)
	{
		FlowSystemSender sender;
		FlowSystemReceiver receiver;
		FlowSystem remoteFs;

		string dest = destObject.toString() + ":" + destPort;
		ASyncNetSend *netsend = new ASyncNetSend(ap, dest);

		sender = FlowSystemSender::_from_base(netsend); // don't release netsend
		remoteFs = destObject._flowSystem();
		receiver = remoteFs.createReceiver(destObject, destPort, sender);
		netsend->setReceiver(receiver);
		arts_debug("connected an asyncnetsend");
	}
}

void StdFlowSystem::disconnectObject(Object sourceObject,
  	const string& sourcePort, Object destObject, const std::string& destPort)
{
	arts_debug("disconnect port %s and %s",sourcePort.c_str(),destPort.c_str());
	StdScheduleNode *sn =
		(StdScheduleNode *)sourceObject._node()->cast("StdScheduleNode");
	assert(sn);

	Port *port = sn->findPort(sourcePort);
	assert(port);

	StdScheduleNode *destsn =
		(StdScheduleNode *)destObject._node()->cast("StdScheduleNode");
	if(destsn)
	{
		sn->disconnect(sourcePort,destsn,destPort);
		return;
	}

	ASyncPort *ap = port->asyncPort();
	if(ap)
	{
		string dest = destObject.toString() + ":" + destPort;
		ap->disconnectRemote(dest);
		arts_debug("disconnected an asyncnetsend");
	}
}

AttributeType StdFlowSystem::queryFlags(Object node, const std::string& port)
{
	StdScheduleNode *sn =
		(StdScheduleNode *)node._node()->cast("StdScheduleNode");
	assert(sn);
	return sn->queryFlags(port);
}

void StdFlowSystem::setFloatValue(Object node, const std::string& port,
							float value)
{
	StdScheduleNode *sn =
		(StdScheduleNode *)node._node()->cast("StdScheduleNode");
	assert(sn);
	sn->setFloatValue(port,value);
}

FlowSystemReceiver StdFlowSystem::createReceiver(Object object,
							const string &port, FlowSystemSender sender)
{
	StdScheduleNode *sn =
		(StdScheduleNode *)object._node()->cast("StdScheduleNode");

	Port *p = sn->findPort(port);
	assert(p);

	ASyncPort *ap = p->asyncPort();

	if(ap)
	{
		arts_debug("creating packet receiver");
		return FlowSystemReceiver::_from_base(new ASyncNetReceive(ap, sender));
	}
	return FlowSystemReceiver::null();
}

void StdFlowSystem::updateStarted()
{
	if(!needUpdateStarted)
		return;

	needUpdateStarted = false;

	std::list<StdScheduleNode*>::iterator ni;
	GslTrans *trans = 0;

	for(ni = nodes.begin(); ni != nodes.end(); ni++)
	{
		StdScheduleNode *node = *ni;

		if(node->running != node->gslRunning)
		{
			if(!trans)
				trans = gsl_trans_open();
			gsl_trans_add(trans, gsl_job_set_consumer(node->gslModule, node->running));
			node->gslRunning = node->running;
		}
	}
	if(trans)
		gsl_trans_commit(trans);
}

void StdFlowSystem::startedChanged()
{
	needUpdateStarted = true;
}

// hacked initialization of Dispatcher::the()->flowSystem ;)

namespace Arts {

static class SetFlowSystem : public StartupClass {
	FlowSystem_impl *fs;
public:
	void startup()
	{
		fs = new StdFlowSystem;
		Dispatcher::the()->setFlowSystem(fs);
	}
	void shutdown()
	{
		fs->_release();
	}
} sfs;

}