1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
#include <QThread>
#include "dsp/threadedsamplesink.h"
#include "util/message.h"
ThreadedSampleSink::ThreadedSampleSink(SampleSink* sampleSink) :
m_thread(new QThread),
m_sampleSink(sampleSink)
{
moveToThread(m_thread);
connect(m_thread, SIGNAL(started()), this, SLOT(threadStarted()));
connect(m_thread, SIGNAL(finished()), this, SLOT(threadFinished()));
m_messageQueue.moveToThread(m_thread);
connect(&m_messageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleMessages()));
m_sampleFifo.moveToThread(m_thread);
connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData()));
m_sampleFifo.setSize(262144);
sampleSink->moveToThread(m_thread);
}
ThreadedSampleSink::~ThreadedSampleSink()
{
m_thread->exit();
m_thread->wait();
delete m_thread;
}
void ThreadedSampleSink::feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool firstOfBurst)
{
Q_UNUSED(firstOfBurst);
m_sampleFifo.write(begin, end);
}
void ThreadedSampleSink::start()
{
m_thread->start();
}
void ThreadedSampleSink::stop()
{
m_thread->exit();
m_thread->wait();
m_sampleFifo.readCommit(m_sampleFifo.fill());
}
bool ThreadedSampleSink::handleMessage(Message* cmd)
{
// called from other thread
m_messageQueue.submit(cmd);
return true;
}
void ThreadedSampleSink::handleData()
{
bool firstOfBurst = true;
while((m_sampleFifo.fill() > 0) && (m_messageQueue.countPending() == 0)) {
SampleVector::iterator part1begin;
SampleVector::iterator part1end;
SampleVector::iterator part2begin;
SampleVector::iterator part2end;
size_t count = m_sampleFifo.readBegin(m_sampleFifo.fill(), &part1begin, &part1end, &part2begin, &part2end);
// first part of FIFO data
if(part1begin != part1end) {
// handle data
if(m_sampleSink != NULL)
m_sampleSink->feed(part1begin, part1end, firstOfBurst);
firstOfBurst = false;
}
// second part of FIFO data (used when block wraps around)
if(part2begin != part2end) {
// handle data
if(m_sampleSink != NULL)
m_sampleSink->feed(part1begin, part1end, firstOfBurst);
firstOfBurst = false;
}
// adjust FIFO pointers
m_sampleFifo.readCommit(count);
}
}
void ThreadedSampleSink::handleMessages()
{
Message* message;
while((message = m_messageQueue.accept()) != NULL) {
qDebug("CMD: %s", message->name());
if(m_sampleSink != NULL) {
if(!m_sampleSink->handleMessage(message))
message->completed();
} else {
message->completed();
}
}
}
void ThreadedSampleSink::threadStarted()
{
if(m_sampleSink != NULL)
m_sampleSink->start();
}
void ThreadedSampleSink::threadFinished()
{
if(m_sampleSink != NULL)
m_sampleSink->stop();
}
|