/* GSL Engine - Flow module operation engine
 * Copyright (C) 2001 Tim Janik
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General
 * Public License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */
#ifndef __GSL_ENGINE_NODE_H__
#define __GSL_ENGINE_NODE_H__

#include "gslengine.h"
#include "gsloputil.h"
#include "gslcommon.h"

#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */



#define	ENGINE_NODE(module)		((EngineNode*) (module))
#define ENGINE_NODE_N_OSTREAMS(node)	((node)->module.klass->n_ostreams)
#define ENGINE_NODE_N_ISTREAMS(node)	((node)->module.klass->n_istreams)
#define ENGINE_NODE_N_JSTREAMS(node)	((node)->module.klass->n_jstreams)
#define	ENGINE_NODE_IS_CONSUMER(node)	((node)->is_consumer && \
					 (node)->output_nodes == NULL)
#define	ENGINE_NODE_IS_DEFERRED(node)	(FALSE)
#define	ENGINE_NODE_IS_SCHEDULED(node)	(ENGINE_NODE (node)->sched_tag)
#define	ENGINE_NODE_IS_CHEAP(node)	(((node)->module.klass->mflags & GSL_COST_CHEAP) != 0)
#define	ENGINE_NODE_IS_EXPENSIVE(node)	(((node)->module.klass->mflags & GSL_COST_EXPENSIVE) != 0)
#define	ENGINE_NODE_LOCK(node)		gsl_rec_mutex_lock (&(node)->rec_mutex)
#define	ENGINE_NODE_UNLOCK(node)	gsl_rec_mutex_unlock (&(node)->rec_mutex)


/* --- debugging and messages --- */
#define ENG_DEBUG		GSL_DEBUG_FUNCTION (GSL_MSG_ENGINE, NULL)
#define MAS_DEBUG		GSL_DEBUG_FUNCTION (GSL_MSG_MASTER, NULL)
#define JOB_DEBUG		GSL_DEBUG_FUNCTION (GSL_MSG_JOBS, NULL)
#define SCHED_DEBUG		GSL_DEBUG_FUNCTION (GSL_MSG_SCHED, NULL)


/* --- transactions --- */
typedef union _EngineFlowJob EngineFlowJob;
typedef enum {
  ENGINE_JOB_NOP,
  ENGINE_JOB_INTEGRATE,
  ENGINE_JOB_DISCARD,
  ENGINE_JOB_ICONNECT,
  ENGINE_JOB_JCONNECT,
  ENGINE_JOB_IDISCONNECT,
  ENGINE_JOB_JDISCONNECT,
  ENGINE_JOB_SET_CONSUMER,
  ENGINE_JOB_UNSET_CONSUMER,
  ENGINE_JOB_ACCESS,
  ENGINE_JOB_ADD_POLL,
  ENGINE_JOB_REMOVE_POLL,
  ENGINE_JOB_FLOW_JOB,
  ENGINE_JOB_DEBUG,
  ENGINE_JOB_LAST
} EngineJobType;
struct _GslJob
{
  EngineJobType       job_id;
  GslJob	     *next;
  union {
    EngineNode	     *node;
    struct {
      EngineNode     *dest_node;
      guint	      dest_ijstream;
      EngineNode     *src_node;
      guint	      src_ostream;
    } connection;
    struct {
      EngineNode     *node;
      GslAccessFunc   access_func;
      gpointer	      data;
      GslFreeFunc     free_func;
    } access;
    struct {
      GslPollFunc     poll_func;
      gpointer	      data;
      GslFreeFunc     free_func;
      guint           n_fds;
      GPollFD	     *fds;
    } poll;
    struct {
      EngineNode     *node;
      EngineFlowJob  *fjob;
    } flow_job;
    gchar	     *debug;
  } data;
};
struct _GslTrans
{
  GslJob   *jobs_head;
  GslJob   *jobs_tail;
  guint	    comitted : 1;
  GslTrans *cqt_next;	/* com-thread-queue */
};
typedef enum {
  ENGINE_FLOW_JOB_NOP,
  ENGINE_FLOW_JOB_SUSPEND,
  ENGINE_FLOW_JOB_RESUME,
  ENGINE_FLOW_JOB_ACCESS,
  ENGINE_FLOW_JOB_LAST
} EngineFlowJobType;
typedef struct
{
  EngineFlowJobType fjob_id;
  EngineFlowJob    *next;
  guint64           tick_stamp;
} EngineFlowJobAny;
typedef struct
{
  EngineFlowJobType fjob_id;
  EngineFlowJob	   *next;
  guint64	    tick_stamp;
  GslAccessFunc     access_func;
  gpointer          data;
  GslFreeFunc       free_func;
} EngineFlowJobAccess;
union _EngineFlowJob
{
  EngineFlowJobType   fjob_id;
  EngineFlowJobAny    any;
  EngineFlowJobAccess access;
};


/* --- module nodes --- */
typedef struct
{
  EngineNode *src_node;
  guint	      src_stream;	/* ostream of src_node */
} EngineInput;
typedef struct
{
  EngineNode *src_node;
  guint	      src_stream;	/* ostream of src_node */
} EngineJInput;
typedef struct
{
  gfloat *buffer;
  guint	  n_outputs;
} EngineOutput;
struct _EngineNode		/* fields sorted by order of processing access */
{
  GslModule	 module;

  GslRecMutex	 rec_mutex;	/* processing lock */
  guint64	 counter;	/* <= GSL_TICK_STAMP */
  EngineInput	*inputs;	/* [ENGINE_NODE_N_ISTREAMS()] */
  EngineJInput **jinputs;	/* [ENGINE_NODE_N_JSTREAMS()] */
  EngineOutput	*outputs;	/* [ENGINE_NODE_N_OSTREAMS()] */

  /* flow jobs */
  EngineFlowJob	*flow_jobs;			/* active jobs */
  EngineFlowJob	*fjob_first, *fjob_last;	/* trash list */

  /* master-node-list */
  EngineNode	*mnl_next;
  EngineNode	*mnl_prev;
  guint		 integrated : 1;
  guint		 reconnected : 1;

  guint		 is_consumer : 1;
  
  /* scheduler */
  guint		 sched_tag : 1;
  guint		 sched_router_tag : 1;
  guint		 sched_leaf_level;
  EngineNode	*toplevel_next;	/* master-consumer-list, FIXME: overkill, using a GslRing is good enough */
  GslRing	*output_nodes;	/* EngineNode* ring of nodes in ->outputs[] */
};

static void
_engine_node_insert_flow_job (EngineNode    *node,
			      EngineFlowJob *fjob)
{
  EngineFlowJob *last = NULL, *tmp = node->flow_jobs;

  /* find next position */
  while (tmp && tmp->any.tick_stamp <= fjob->any.tick_stamp)
    {
      last = tmp;
      tmp = last->any.next;
    }
  /* insert before */
  fjob->any.next = tmp;
  if (last)
    last->any.next = fjob;
  else
    node->flow_jobs = fjob;
}

static inline EngineFlowJob*
_engine_node_pop_flow_job (EngineNode *node,
			   guint64     tick_stamp)
{
  EngineFlowJob *fjob = node->flow_jobs;

  if_reject (fjob)
    {
      if (fjob->any.tick_stamp <= tick_stamp)
	{
	  node->flow_jobs = fjob->any.next;
	  
	  fjob->any.next = node->fjob_first;
	  node->fjob_first = fjob;
	  if (!node->fjob_last)
	    node->fjob_last = node->fjob_first;
	}
      else
	fjob = NULL;
    }

  return fjob;
}

static inline guint64
_engine_node_peek_flow_job_stamp (EngineNode *node)
{
  EngineFlowJob *fjob = node->flow_jobs;

  if_reject (fjob)
    return fjob->any.tick_stamp;

  return GSL_MAX_TICK_STAMP;
}


#ifdef __cplusplus
}
#endif /* __cplusplus */

#endif /* __GSL_ENGINE_NODE_H__ */