summaryrefslogtreecommitdiffstats
path: root/flow/asyncschedule.cc
diff options
context:
space:
mode:
Diffstat (limited to 'flow/asyncschedule.cc')
-rw-r--r--flow/asyncschedule.cc553
1 files changed, 0 insertions, 553 deletions
diff --git a/flow/asyncschedule.cc b/flow/asyncschedule.cc
deleted file mode 100644
index 66ae48b..0000000
--- a/flow/asyncschedule.cc
+++ /dev/null
@@ -1,553 +0,0 @@
- /*
-
- Copyright (C) 2000,2001 Stefan Westerfeld
- stefan@space.twc.de
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Library General Public
- License as published by the Free Software Foundation; either
- version 2 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Library General Public License for more details.
-
- You should have received a copy of the GNU Library General Public License
- along with this library; see the file COPYING.LIB. If not, write to
- the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
-
- */
-
-#include <iostream>
-#include "asyncschedule.h"
-
-using namespace std;
-using namespace Arts;
-
-#include "debug.h"
-#include <stdio.h>
-
-/* Since this file is a tad bit more complex, here is some basic documentation:
-
-1) ASyncPort: There are asynchronous ports which are parts of the standard-
- flowsystem schedule nodes. Their lifetime starts whenever an asynchronous
- stream gets created by the flow system, and ends when the schedule node
- gets destroyed. Basically, an ASyncPort has two functions:
-
- * it is a "Port", which means that it gets connect(), disconnect() and
- other calls from the flowsystem
-
- * it is a "GenericDataChannel", which means that DataPackets can interact
- with it
-
- Although there will be ASyncPorts which only send data and ASyncPorts which
- only receive data (there are none that do both), there are no distinct
- classes for this.
-
-2) Standard case: a DataPacket that gets transported over a datachannel locally:
-
- 1. the user allocates himself a datapacket "packet"
- 2. the user calls "packet->send()", which in turn calls
- ASyncPort::sendPacket(packet)
- 3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
- useCount) over the NotificationManager
- 4. the NotificationManager delivers the DataPackets to the receiver
- 5. eventually, the receiver confirms using "packet->processed()"
- 6. the packet informs the ASyncPort::processedPacket()
- 7. the packet is freed
-
-variant (pulling):
-
- 1. the user gets told by the ASyncPort: produce some data, here is a "packet"
- 2. the user calls "packet->send()", which in turn calls
- ASyncPort::sendPacket(packet)
- 3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
- useCount) over the NotificationManager
- 4. the NotificationManager delivers the DataPackets to the receiver
- 5. eventually, the receiver confirms using "packet->processed()"
- 6. the packet informs the ASyncPort::processedPacket()
- 7. the ASyncPort restarts with 1.
-
-3) Remote case: the remote case follows from the local case by adding two extra
-things: one object that converts packets from their packet form to a message
-(ASyncNetSend), and one object that converts packets from the message form
-to a packet again. Effectively, the sending of a single packet looks like
-this, then:
-
- 1-S. the user allocates himself a datapacket "packet"
- 2-S. the user calls "packet->send()", which in turn calls
- ASyncPort::sendPacket(packet)
- 3-S. the ASyncPort sends the DataPacket to every subscriber (incrementing the
- useCount) over the NotificationManager
- 4-S. the NotificationManager delivers the DataPackets to the ASyncNetSend
- 5-S. the ASyncNetSend::notify method gets called, which in turn converts
- the packet to a network message
-
- ... network transfer ...
-
- 6-R. the ASyncNetReceive::receive method gets called - the method creates
- a new data packet, and sends it using the NotificationManager again
- 7-R. the NotificationManager delivers the DataPacket to the receiver
- 8-R. eventually, the receiver confirms using "packet->processed()"
- 9-R. the packet informs the ASyncNetReceive::processedPacket() which
- frees the packet and tells the (remote) sender that it went all right
-
- ... network transfer ...
-
- 10-S. eventually, ASyncNetSend::processed() gets called, and confirms
- the packet using "packet->processed()"
- 11-S. the packet informs the ASyncPort::processedPacket()
- 12-S. the packet is freed
-
-variant(pulling):
-
- works the same as in the local case by exchanging steps 1-S and 12-S
-
-4) ownership:
-
- * ASyncPort: is owned by the Object which it is a part of, if the object
- dies, ASyncPort dies unconditionally
-
- * DataPacket: is owned by the GenericDataChannel they are propagated over,
- that is, the ASyncPort normally - however if the DataPacket is still in
- use (i.e. in state 5 of the local case), it will take responsibility to
- free itself once all processed() calls have been collected
-
- * ASyncNetSend, ASyncNetReceive: own each other, so that if the sender dies,
- the connection will die as well, and if the receiver dies, the same happens
-
-*/
-
-#undef DEBUG_ASYNC_TRANSFER
-
-ASyncPort::ASyncPort(const std::string& name, void *ptr, long flags,
- StdScheduleNode* parent) : Port(name, ptr, flags, parent), pull(false)
-{
- stream = (GenericAsyncStream *)ptr;
- stream->channel = this;
- stream->_notifyID = notifyID = parent->object()->_mkNotifyID();
-}
-
-ASyncPort::~ASyncPort()
-{
- /*
- * tell all outstanding packets that we don't exist any longer, so that
- * if they feel like they need to confirm they have been processed now,
- * they don't talk to an no longer existing object about it
- */
- while(!sent.empty())
- {
- sent.front()->channel = 0;
- sent.pop_front();
- }
-
- /* disconnect remote connections (if present): the following things will
- * need to be ensured here, since we are being deleted:
- *
- * - the senders should not talk to us after our destructor
- * - all of our connections need to be disconnected
- * - every connection needs to be closed exactly once
- *
- * (closing a connection can cause reentrancy due to mcop communication)
- */
- while(!netSenders.empty())
- netSenders.front()->disconnect();
-
- FlowSystemReceiver receiver = netReceiver;
- if(!receiver.isNull())
- receiver.disconnect();
-}
-
-//-------------------- GenericDataChannel interface -------------------------
-
-void ASyncPort::setPull(int packets, int capacity)
-{
- pullNotification.receiver = parent->object();
- pullNotification.ID = notifyID;
- pullNotification.internal = 0;
- pull = true;
-
- for(int i=0;i<packets;i++)
- {
- GenericDataPacket *packet = stream->createPacket(capacity);
- packet->useCount = 0;
- pullNotification.data = packet;
- NotificationManager::the()->send(pullNotification);
- }
-}
-
-void ASyncPort::endPull()
-{
- pull = false;
- // TODO: maybe remove all pending pull packets here
-}
-
-void ASyncPort::processedPacket(GenericDataPacket *packet)
-{
- int count = 0;
- list<GenericDataPacket *>::iterator i = sent.begin();
- while(i != sent.end())
- {
- if(*i == packet)
- {
- count++;
- i = sent.erase(i);
- }
- else i++;
- }
- assert(count == 1);
-
-#ifdef DEBUG_ASYNC_TRANSFER
- cout << "port::processedPacket" << endl;
-#endif
- assert(packet->useCount == 0);
- if(pull)
- {
- pullNotification.data = packet;
- NotificationManager::the()->send(pullNotification);
- }
- else
- {
- stream->freePacket(packet);
- }
-}
-
-void ASyncPort::sendPacket(GenericDataPacket *packet)
-{
- bool sendOk = false;
-
-#ifdef DEBUG_ASYNC_TRANSFER
- cout << "port::sendPacket" << endl;
-#endif
-
- if(packet->size > 0)
- {
- vector<Notification>::iterator i;
- for(i=subscribers.begin(); i != subscribers.end(); i++)
- {
- Notification n = *i;
- n.data = packet;
- packet->useCount++;
-#ifdef DEBUG_ASYNC_TRANSFER
- cout << "sending notification " << n.ID << endl;
-#endif
- NotificationManager::the()->send(n);
- sendOk = true;
- }
- }
-
- if(sendOk)
- sent.push_back(packet);
- else
- stream->freePacket(packet);
-}
-
-//----------------------- Port interface ------------------------------------
-
-void ASyncPort::connect(Port *xsource)
-{
- arts_debug("port(%s)::connect",_name.c_str());
-
- ASyncPort *source = xsource->asyncPort();
- assert(source);
- addAutoDisconnect(xsource);
-
- Notification n;
- n.receiver = parent->object();
- n.ID = notifyID;
- n.internal = 0;
- source->subscribers.push_back(n);
-}
-
-void ASyncPort::disconnect(Port *xsource)
-{
- arts_debug("port::disconnect");
-
- ASyncPort *source = xsource->asyncPort();
- assert(source);
- removeAutoDisconnect(xsource);
-
- // remove our subscription from the source object
- vector<Notification>::iterator si;
- for(si = source->subscribers.begin(); si != source->subscribers.end(); si++)
- {
- if(si->receiver == parent->object())
- {
- source->subscribers.erase(si);
- return;
- }
- }
-
- // there should have been exactly one, so this shouldn't be reached
- assert(false);
-}
-
-ASyncPort *ASyncPort::asyncPort()
-{
- return this;
-}
-
-GenericAsyncStream *ASyncPort::receiveNetCreateStream()
-{
- return stream->createNewStream();
-}
-
-NotificationClient *ASyncPort::receiveNetObject()
-{
- return parent->object();
-}
-
-long ASyncPort::receiveNetNotifyID()
-{
- return notifyID;
-}
-
-// Network transparency
-void ASyncPort::addSendNet(ASyncNetSend *netsend)
-{
- Notification n;
- n.receiver = netsend;
- n.ID = netsend->notifyID();
- n.internal = 0;
- subscribers.push_back(n);
- netSenders.push_back(netsend);
-}
-
-void ASyncPort::removeSendNet(ASyncNetSend *netsend)
-{
- arts_return_if_fail(netsend != 0);
- netSenders.remove(netsend);
-
- vector<Notification>::iterator si;
- for(si = subscribers.begin(); si != subscribers.end(); si++)
- {
- if(si->receiver == netsend)
- {
- subscribers.erase(si);
- return;
- }
- }
- arts_warning("Failed to remove ASyncNetSend (%p) from ASyncPort", netsend);
-}
-
-void ASyncPort::setNetReceiver(ASyncNetReceive *receiver)
-{
- arts_return_if_fail(receiver != 0);
-
- FlowSystemReceiver r = FlowSystemReceiver::_from_base(receiver->_copy());
- netReceiver = r;
-}
-
-void ASyncPort::disconnectRemote(const string& dest)
-{
- list<ASyncNetSend *>::iterator i;
-
- for(i = netSenders.begin(); i != netSenders.end(); i++)
- {
- if((*i)->dest() == dest)
- {
- (*i)->disconnect();
- return;
- }
- }
- arts_warning("failed to disconnect %s in ASyncPort", dest.c_str());
-}
-
-ASyncNetSend::ASyncNetSend(ASyncPort *ap, const std::string& dest) : ap(ap)
-{
- _dest = dest;
- ap->addSendNet(this);
-}
-
-ASyncNetSend::~ASyncNetSend()
-{
- while(!pqueue.empty())
- {
- pqueue.front()->processed();
- pqueue.pop();
- }
- if(ap)
- {
- ap->removeSendNet(this);
- ap = 0;
- }
-}
-
-long ASyncNetSend::notifyID()
-{
- return 1;
-}
-
-void ASyncNetSend::notify(const Notification& notification)
-{
- // got a packet?
- assert(notification.ID == notifyID());
- GenericDataPacket *dp = (GenericDataPacket *)notification.data;
- pqueue.push(dp);
-
- /*
- * since packets are delivered asynchronously, and since disconnection
- * involves communication, it might happen that we get a packet without
- * actually being connected any longer - in that case, silently forget it
- */
- if(!receiver.isNull())
- {
- // put it into a custom data message and send it to the receiver
- Buffer *buffer = receiver._allocCustomMessage(receiveHandlerID);
- dp->write(*buffer);
- receiver._sendCustomMessage(buffer);
- }
-}
-
-void ASyncNetSend::processed()
-{
- assert(!pqueue.empty());
- pqueue.front()->processed();
- pqueue.pop();
-}
-
-void ASyncNetSend::setReceiver(FlowSystemReceiver newReceiver)
-{
- receiver = newReceiver;
- receiveHandlerID = newReceiver.receiveHandlerID();
-}
-
-void ASyncNetSend::disconnect()
-{
- /* since disconnection will cause destruction (most likely immediate),
- * we'll reference ourselves ... */
- _copy();
-
- if(!receiver.isNull())
- {
- FlowSystemReceiver r = receiver;
- receiver = FlowSystemReceiver::null();
- r.disconnect();
- }
- if(ap)
- {
- ap->removeSendNet(this);
- ap = 0;
- }
-
- _release();
-}
-
-string ASyncNetSend::dest()
-{
- return _dest;
-}
-
-/* dispatching function for custom message */
-
-static void _dispatch_ASyncNetReceive_receive(void *object, Buffer *buffer)
-{
- ((ASyncNetReceive *)object)->receive(buffer);
-}
-
-ASyncNetReceive::ASyncNetReceive(ASyncPort *port, FlowSystemSender sender)
-{
- port->setNetReceiver(this);
- stream = port->receiveNetCreateStream();
- stream->channel = this;
- this->sender = sender;
- /* stream->_notifyID = _mkNotifyID(); */
-
- gotPacketNotification.ID = port->receiveNetNotifyID();
- gotPacketNotification.receiver = port->receiveNetObject();
- gotPacketNotification.internal = 0;
- _receiveHandlerID =
- _addCustomMessageHandler(_dispatch_ASyncNetReceive_receive,this);
-}
-
-ASyncNetReceive::~ASyncNetReceive()
-{
- /* tell outstanding packets that we don't exist any longer */
- while(!sent.empty())
- {
- sent.front()->channel = 0;
- sent.pop_front();
- }
- delete stream;
-}
-
-long ASyncNetReceive::receiveHandlerID()
-{
- return _receiveHandlerID;
-}
-
-void ASyncNetReceive::receive(Buffer *buffer)
-{
- GenericDataPacket *dp = stream->createPacket(512);
- dp->read(*buffer);
- dp->useCount = 1;
- gotPacketNotification.data = dp;
- NotificationManager::the()->send(gotPacketNotification);
- sent.push_back(dp);
-}
-
-/*
- * It will happen that this routine is called in time critical situations,
- * such as: while audio calculation is running, and must be finished in
- * time. The routine is mostly harmless, because sender->processed() is
- * a oneway function, which just queues the buffer for sending and returns
- * back, so it should return at once.
- *
- * However there is an exception upon first call: when sender->processed()
- * is called for the first time, the method processed has still to be looked
- * up. Thus, a synchronous call to _lookupMethod is made. That means, upon
- * first call, the method will send out an MCOP request and block until the
- * remote process tells that id.
- */
-void ASyncNetReceive::processedPacket(GenericDataPacket *packet)
-{
- /*
- * HACK! Upon disconnect, strange things will happen. One of them is
- * that we might, for the reason of not being referenced any longer,
- * cease to exist without warning. Another is that our nice "sender"
- * reference will get a null reference without warning, see disconnect
- * code (which will cause the attached stub to also disappear). As
- * those objects (especially the stub) are not prepared for not
- * being there any more in the middle of whatever they do, we here
- * explicitly reference us, and them, *again*, so that no evil things
- * will happen. A general solution for this would be garbage collection
- * in a timer, but until this is implemented (if it ever will become
- * implemented), we'll live with this hack.
- */
- _copy();
- sent.remove(packet);
- stream->freePacket(packet);
- if(!sender.isNull())
- {
- FlowSystemSender xsender = sender;
- xsender.processed();
- }
- _release();
-}
-
-void ASyncNetReceive::disconnect()
-{
- if(!sender.isNull())
- {
- FlowSystemSender s = sender;
- sender = FlowSystemSender::null();
- s.disconnect();
- }
-}
-
-void ASyncNetReceive::sendPacket(GenericDataPacket *)
-{
- assert(false);
-}
-
-void ASyncNetReceive::setPull(int, int)
-{
- assert(false);
-}
-
-void ASyncNetReceive::endPull()
-{
- assert(false);
-}