/*

    Copyright (C) 2000,2001 Stefan Westerfeld
                            stefan@space.twc.de

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Library General Public
    License as published by the Free Software Foundation; either
    version 2 of the License, or (at your option) any later version.
  
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Library General Public License for more details.
   
    You should have received a copy of the GNU Library General Public License
    along with this library; see the file COPYING.LIB.  If not, write to
    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
    Boston, MA 02110-1301, USA.

    */

#include <iostream>
#include "asyncschedule.h"

using namespace std;
using namespace Arts;

#include "debug.h"
#include <stdio.h>

/* Since this file is a tad bit more complex, here is some basic documentation:

1) ASyncPort: There are asynchronous ports which are parts of the standard-
   flowsystem schedule nodes. Their lifetime starts whenever an asynchronous
   stream gets created by the flow system, and ends when the schedule node
   gets destroyed. Basically, an ASyncPort has two functions:

   * it is a "Port", which means that it gets connect(), disconnect() and
     other calls from the flowsystem

   * it is a "GenericDataChannel", which means that DataPackets can interact
     with it

   Although there will be ASyncPorts which only send data and ASyncPorts which
   only receive data (there are none that do both), there are no distinct
   classes for this.

2) Standard case: a DataPacket that gets transported over a datachannel locally:

   1. the user allocates himself a datapacket "packet"
   2. the user calls "packet->send()", which in turn calls
      ASyncPort::sendPacket(packet)
   3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
      useCount) over the NotificationManager
   4. the NotificationManager delivers the DataPackets to the receiver
   5. eventually, the receiver confirms using "packet->processed()"
   6. the packet informs the ASyncPort::processedPacket()
   7. the packet is freed

variant (pulling):

   1. the user gets told by the ASyncPort: produce some data, here is a "packet"
   2. the user calls "packet->send()", which in turn calls
      ASyncPort::sendPacket(packet)
   3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
      useCount) over the NotificationManager
   4. the NotificationManager delivers the DataPackets to the receiver
   5. eventually, the receiver confirms using "packet->processed()"
   6. the packet informs the ASyncPort::processedPacket()
   7. the ASyncPort restarts with 1.

3) Remote case: the remote case follows from the local case by adding two extra
things: one object that converts packets from their packet form to a message
(ASyncNetSend), and one object that converts packets from the message form
to a packet again. Effectively, the sending of a single packet looks like
this, then:

   1-S. the user allocates himself a datapacket "packet"
   2-S. the user calls "packet->send()", which in turn calls
        ASyncPort::sendPacket(packet)
   3-S. the ASyncPort sends the DataPacket to every subscriber (incrementing the
        useCount) over the NotificationManager
   4-S. the NotificationManager delivers the DataPackets to the ASyncNetSend
   5-S. the ASyncNetSend::notify method gets called, which in turn converts
        the packet to a network message
    
	... network transfer ...
   
   6-R. the ASyncNetReceive::receive method gets called - the method creates
        a new data packet, and sends it using the NotificationManager again
   7-R. the NotificationManager delivers the DataPacket to the receiver
   8-R. eventually, the receiver confirms using "packet->processed()"
   9-R. the packet informs the ASyncNetReceive::processedPacket() which
        frees the packet and tells the (remote) sender that it went all right

	... network transfer ...
 
   10-S. eventually, ASyncNetSend::processed() gets called, and confirms
         the packet using "packet->processed()"
   11-S. the packet informs the ASyncPort::processedPacket()
   12-S. the packet is freed

variant(pulling):

   works the same as in the local case by exchanging steps 1-S and 12-S

4) ownership:

   * ASyncPort: is owned by the Object which it is a part of, if the object
   dies, ASyncPort dies unconditionally

   * DataPacket: is owned by the GenericDataChannel they are propagated over,
   that is, the ASyncPort normally - however if the DataPacket is still in
   use (i.e. in state 5 of the local case), it will take responsibility to
   free itself once all processed() calls have been collected

   * ASyncNetSend, ASyncNetReceive: own each other, so that if the sender dies,
   the connection will die as well, and if the receiver dies, the same happens

*/

#undef DEBUG_ASYNC_TRANSFER

ASyncPort::ASyncPort(const std::string& name, void *ptr, long flags,
		StdScheduleNode* parent) : Port(name, ptr, flags, parent), pull(false)
{
	stream = (GenericAsyncStream *)ptr;
	stream->channel = this;
	stream->_notifyID = notifyID = parent->object()->_mkNotifyID();
}

ASyncPort::~ASyncPort()
{
	/* 
	 * tell all outstanding packets that we don't exist any longer, so that
	 * if they feel like they need to confirm they have been processed now,
	 * they don't talk to an no longer existing object about it
	 */
	while(!sent.empty())
	{
		sent.front()->channel = 0;
		sent.pop_front();
	}

	/* disconnect remote connections (if present): the following things will
	 * need to be ensured here, since we are being deleted:
     *
	 *  - the senders should not talk to us after our destructor
	 *  - all of our connections need to be disconnected
	 *  - every connection needs to be closed exactly once
     *
	 * (closing a connection can cause reentrancy due to mcop communication)
	 */
	while(!netSenders.empty())
		netSenders.front()->disconnect();

	FlowSystemReceiver receiver = netReceiver;
	if(!receiver.isNull())
		receiver.disconnect();
}

//-------------------- GenericDataChannel interface -------------------------

void ASyncPort::setPull(int packets, int capacity)
{
	pullNotification.receiver = parent->object();
	pullNotification.ID = notifyID;
	pullNotification.internal = 0;
	pull = true;

	for(int i=0;i<packets;i++)
	{
		GenericDataPacket *packet = stream->createPacket(capacity);
		packet->useCount = 0;
		pullNotification.data = packet;
		NotificationManager::the()->send(pullNotification);
	}
}

void ASyncPort::endPull()
{
	pull = false;
	// TODO: maybe remove all pending pull packets here
}

void ASyncPort::processedPacket(GenericDataPacket *packet)
{
	int count = 0;
	list<GenericDataPacket *>::iterator i = sent.begin();
	while(i != sent.end())
	{
		if(*i == packet)
		{
			count++;
			i = sent.erase(i);
		}
		else i++;
	}
	assert(count == 1);

#ifdef DEBUG_ASYNC_TRANSFER
	cout << "port::processedPacket" << endl;
#endif
	assert(packet->useCount == 0);
	if(pull)
	{
		pullNotification.data = packet;
		NotificationManager::the()->send(pullNotification);
	}
	else
	{
		stream->freePacket(packet);
	}
}

void ASyncPort::sendPacket(GenericDataPacket *packet)
{
	bool sendOk = false;

#ifdef DEBUG_ASYNC_TRANSFER
	cout << "port::sendPacket" << endl;
#endif

	if(packet->size > 0)
	{
		vector<Notification>::iterator i;
		for(i=subscribers.begin(); i != subscribers.end(); i++)
		{
			Notification n = *i;
			n.data = packet;
			packet->useCount++;
#ifdef DEBUG_ASYNC_TRANSFER
			cout << "sending notification " << n.ID << endl;
#endif
			NotificationManager::the()->send(n);
			sendOk = true;
		}
	}

	if(sendOk)
		sent.push_back(packet);
	else
		stream->freePacket(packet);
}

//----------------------- Port interface ------------------------------------

void ASyncPort::connect(Port *xsource)
{
	arts_debug("port(%s)::connect",_name.c_str());

	ASyncPort *source = xsource->asyncPort();
	assert(source);
	addAutoDisconnect(xsource);

	Notification n;
	n.receiver = parent->object();
	n.ID = notifyID;
	n.internal = 0;
	source->subscribers.push_back(n);
}

void ASyncPort::disconnect(Port *xsource)
{
	arts_debug("port::disconnect");

	ASyncPort *source = xsource->asyncPort();
	assert(source);
	removeAutoDisconnect(xsource);

	// remove our subscription from the source object
	vector<Notification>::iterator si;
	for(si = source->subscribers.begin(); si != source->subscribers.end(); si++)
	{
		if(si->receiver == parent->object())
		{
			source->subscribers.erase(si);
			return;
		}
	}

	// there should have been exactly one, so this shouldn't be reached
	assert(false);
}

ASyncPort *ASyncPort::asyncPort()
{
	return this;
}

GenericAsyncStream *ASyncPort::receiveNetCreateStream()
{
	return stream->createNewStream();
}

NotificationClient *ASyncPort::receiveNetObject()
{
	return parent->object();
}

long ASyncPort::receiveNetNotifyID()
{
	return notifyID;
}

// Network transparency
void ASyncPort::addSendNet(ASyncNetSend *netsend)
{
	Notification n;
	n.receiver = netsend;
	n.ID = netsend->notifyID();
	n.internal = 0;
	subscribers.push_back(n);
	netSenders.push_back(netsend);
}

void ASyncPort::removeSendNet(ASyncNetSend *netsend)
{
	arts_return_if_fail(netsend != 0);
	netSenders.remove(netsend);

	vector<Notification>::iterator si;
	for(si = subscribers.begin(); si != subscribers.end(); si++)
	{
		if(si->receiver == netsend)
		{
			subscribers.erase(si);
			return;
		}
	}
	arts_warning("Failed to remove ASyncNetSend (%p) from ASyncPort", netsend);
}

void ASyncPort::setNetReceiver(ASyncNetReceive *receiver)
{
	arts_return_if_fail(receiver != 0);

	FlowSystemReceiver r = FlowSystemReceiver::_from_base(receiver->_copy());
	netReceiver = r;
}

void ASyncPort::disconnectRemote(const string& dest)
{
	list<ASyncNetSend *>::iterator i;

	for(i = netSenders.begin(); i != netSenders.end(); i++)
	{
		if((*i)->dest() == dest)
		{
			(*i)->disconnect();
			return;
		}
	}
	arts_warning("failed to disconnect %s in ASyncPort", dest.c_str());
}

ASyncNetSend::ASyncNetSend(ASyncPort *ap, const std::string& dest) : ap(ap)
{
	_dest = dest;
	ap->addSendNet(this);
}

ASyncNetSend::~ASyncNetSend()
{
	while(!pqueue.empty())
	{
		pqueue.front()->processed();
		pqueue.pop();
	}
	if(ap)
	{
		ap->removeSendNet(this);
		ap = 0;
	}
}

long ASyncNetSend::notifyID()
{
	return 1;
}

void ASyncNetSend::notify(const Notification& notification)
{
	// got a packet?
	assert(notification.ID == notifyID());
	GenericDataPacket *dp = (GenericDataPacket *)notification.data;
	pqueue.push(dp);

	/*
	 * since packets are delivered asynchronously, and since disconnection
	 * involves communication, it might happen that we get a packet without
	 * actually being connected any longer - in that case, silently forget it
	 */
	if(!receiver.isNull())
	{
		// put it into a custom data message and send it to the receiver
		Buffer *buffer = receiver._allocCustomMessage(receiveHandlerID);
		dp->write(*buffer);
		receiver._sendCustomMessage(buffer);
	}
}

void ASyncNetSend::processed()
{
	assert(!pqueue.empty());
	pqueue.front()->processed();
	pqueue.pop();
}

void ASyncNetSend::setReceiver(FlowSystemReceiver newReceiver)
{
	receiver = newReceiver;
	receiveHandlerID = newReceiver.receiveHandlerID();
}

void ASyncNetSend::disconnect()
{
	/* since disconnection will cause destruction (most likely immediate),
	 * we'll reference ourselves ...  */
	_copy();

	if(!receiver.isNull())
	{
		FlowSystemReceiver r = receiver;
		receiver = FlowSystemReceiver::null();
		r.disconnect();
	}
	if(ap)
	{
		ap->removeSendNet(this);
		ap = 0;
	}
	
	_release();
}

string ASyncNetSend::dest()
{
	return _dest;
}

/* dispatching function for custom message */

static void _dispatch_ASyncNetReceive_receive(void *object, Buffer *buffer)
{
	((ASyncNetReceive *)object)->receive(buffer);
}

ASyncNetReceive::ASyncNetReceive(ASyncPort *port, FlowSystemSender sender)
{
	port->setNetReceiver(this);
	stream = port->receiveNetCreateStream();
	stream->channel = this;
	this->sender = sender;
	/* stream->_notifyID = _mkNotifyID(); */

	gotPacketNotification.ID = port->receiveNetNotifyID();
	gotPacketNotification.receiver = port->receiveNetObject();
	gotPacketNotification.internal = 0;
	_receiveHandlerID =
		_addCustomMessageHandler(_dispatch_ASyncNetReceive_receive,this);
}

ASyncNetReceive::~ASyncNetReceive()
{
	/* tell outstanding packets that we don't exist any longer */
	while(!sent.empty())
	{
		sent.front()->channel = 0;
		sent.pop_front();
	}
	delete stream;
}

long ASyncNetReceive::receiveHandlerID()
{
	return _receiveHandlerID;
}

void ASyncNetReceive::receive(Buffer *buffer)
{
	GenericDataPacket *dp = stream->createPacket(512);
	dp->read(*buffer);
	dp->useCount = 1;
	gotPacketNotification.data = dp;
	NotificationManager::the()->send(gotPacketNotification);
	sent.push_back(dp);
}

/*
 * It will happen that this routine is called in time critical situations,
 * such as: while audio calculation is running, and must be finished in
 * time. The routine is mostly harmless, because sender->processed() is
 * a oneway function, which just queues the buffer for sending and returns
 * back, so it should return at once.
 *
 * However there is an exception upon first call: when sender->processed()
 * is called for the first time, the method processed has still to be looked
 * up. Thus, a synchronous call to _lookupMethod is made. That means, upon
 * first call, the method will send out an MCOP request and block until the
 * remote process tells that id.
 */
void ASyncNetReceive::processedPacket(GenericDataPacket *packet)
{
	/*
	 * HACK! Upon disconnect, strange things will happen. One of them is
	 * that we might, for the reason of not being referenced any longer,
	 * cease to exist without warning. Another is that our nice "sender"
	 * reference will get a null reference without warning, see disconnect
	 * code (which will cause the attached stub to also disappear). As
	 * those objects (especially the stub) are not prepared for not
	 * being there any more in the middle of whatever they do, we here
	 * explicitly reference us, and them, *again*, so that no evil things
	 * will happen. A general solution for this would be garbage collection
	 * in a timer, but until this is implemented (if it ever will become
	 * implemented), we'll live with this hack.
	 */
	_copy();
	sent.remove(packet);
	stream->freePacket(packet);
	if(!sender.isNull())
	{
		FlowSystemSender xsender = sender;
		xsender.processed();
	}
	_release();
}

void ASyncNetReceive::disconnect()
{
	if(!sender.isNull())
	{
		FlowSystemSender s = sender;
		sender = FlowSystemSender::null();
		s.disconnect();
	}
}

void ASyncNetReceive::sendPacket(GenericDataPacket *)
{
	assert(false);
}

void ASyncNetReceive::setPull(int, int)
{
	assert(false);
}

void ASyncNetReceive::endPull()
{
	assert(false);
}