summaryrefslogtreecommitdiffstats
path: root/flow/bufferqueue.h
blob: be332c3672a8feeac8bfae4efa55cc2e3bc2df8f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
 * BC - tqStatus (2002-03-08): ByteBuffer, BufferQueue
 *
 * None of these classes is considered part of the public API. Do NOT use it
 * in your apps. These are part of the implementation of libartsflow's
 * AudioSubSystem, and subject to change/disappearing due to optimization
 * work.
 */

#ifndef _BUFFERQUEUE_H
#define _BUFFERQUEUE_H

#include "thread.h"

#define _DEFAULT_CHUNK_SIZE 4096
#define _MAX_CHUNKS 3

namespace Arts
{

class ByteBuffer
{
	unsigned char* content;
	int _size;
	int _maxSize;
	int rp;

public:
	ByteBuffer() {
		_size = rp = 0;
		_maxSize = _DEFAULT_CHUNK_SIZE;
		content = new unsigned char[_DEFAULT_CHUNK_SIZE];
	}
	ByteBuffer(const void* s, int len) {
		_maxSize = _DEFAULT_CHUNK_SIZE;
		content = new unsigned char[_DEFAULT_CHUNK_SIZE];
		put(s, len);
	}

	~ByteBuffer() { delete [] content; }

	void put(const void* s, int len) {
		if ((_size = len) != 0)
			memcpy(content, s, len);
		rp = 0;
	}

	void* get()          { return content+rp; }
	void* reset()        { _size = 0; rp = 0; return content; }
	int push(int len)    { _size -= len; rp += len; return _size; }
	void set(int len)    { _size = len; rp = 0; }
	int size() const     { return _size; }
	int maxSize() const  { return _maxSize; }

	void setMaxSize(int size) {
		delete [] content;
		content = new unsigned char[size];
		_maxSize = size;
	}
};

///////////////////////////////////////////////////////////////////////////////

class BufferQueue
{
private:
	ByteBuffer bufs[_MAX_CHUNKS];
	int rp;
	int wp;
	Arts::Semaphore* sema_produced;
	Arts::Semaphore* sema_consumed;

	void semaReinit() {
		delete sema_consumed;
		delete sema_produced;
		sema_consumed = new Arts::Semaphore(0, _MAX_CHUNKS);
		sema_produced = new Arts::Semaphore(0, 0);
	}


public:
	BufferQueue() {
		rp = wp = 0;
		sema_consumed = new Arts::Semaphore(0, _MAX_CHUNKS);
		sema_produced = new Arts::Semaphore(0, 0);
	}

	~BufferQueue() {
		delete sema_consumed;
		delete sema_produced;
	}

	void write(void* data, int len);
	ByteBuffer* waitConsumed();
	void produced();

	ByteBuffer* waitProduced();
	void consumed();

	bool isEmpty() const       { return sema_produced->getValue() == 0; }
	int bufferedChunks() const { return sema_produced->getValue(); }
	int freeChunks() const     { return sema_consumed->getValue(); }
	int maxChunks() const      { return _MAX_CHUNKS; }
	int chunkSize() const      { return bufs[0].maxSize(); }
	void clear()               { rp = wp = 0; semaReinit(); }
	void setChunkSize(int size){
		for (int i=0; i < maxChunks(); i++)
			bufs[i].setMaxSize(size);
	}
};

///////////////////////////////////////////////////////////////////////////////

inline void BufferQueue::write(void* data, int len)
{
	sema_consumed->wait();
	bufs[wp].put(data, len);
	++wp %= _MAX_CHUNKS;
	sema_produced->post();
}

inline ByteBuffer* BufferQueue::waitConsumed()
{
	sema_consumed->wait();
	return &bufs[wp];
}

inline void BufferQueue::produced()
{
	++wp %= _MAX_CHUNKS;
	sema_produced->post();
}

inline ByteBuffer* BufferQueue::waitProduced()
{
	sema_produced->wait();
	return &bufs[rp];
}

inline void BufferQueue::consumed()
{
	++rp %=_MAX_CHUNKS;
	sema_consumed->post();
}

}

#endif