summaryrefslogtreecommitdiffstats
path: root/sdrbase/dsp/threadedsamplesink.cpp
blob: 15fb2778bb292285499ded7110d465f09e458d92 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <QThread>
#include "dsp/threadedsamplesink.h"
#include "util/message.h"

ThreadedSampleSink::ThreadedSampleSink(SampleSink* sampleSink) :
	m_thread(new QThread),
	m_sampleSink(sampleSink)
{
	moveToThread(m_thread);
	connect(m_thread, SIGNAL(started()), this, SLOT(threadStarted()));
	connect(m_thread, SIGNAL(finished()), this, SLOT(threadFinished()));

	m_messageQueue.moveToThread(m_thread);
	connect(&m_messageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleMessages()));

	m_sampleFifo.moveToThread(m_thread);
	connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData()));
	m_sampleFifo.setSize(262144);

	sampleSink->moveToThread(m_thread);
}

ThreadedSampleSink::~ThreadedSampleSink()
{
	m_thread->exit();
	m_thread->wait();
	delete m_thread;
}

void ThreadedSampleSink::feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool firstOfBurst)
{
	Q_UNUSED(firstOfBurst);
	m_sampleFifo.write(begin, end);
}

void ThreadedSampleSink::start()
{
	m_thread->start();
}

void ThreadedSampleSink::stop()
{
	m_thread->exit();
	m_thread->wait();
	m_sampleFifo.readCommit(m_sampleFifo.fill());
}

bool ThreadedSampleSink::handleMessage(Message* cmd)
{
	// called from other thread
	m_messageQueue.submit(cmd);
	return true;
}

void ThreadedSampleSink::handleData()
{
	bool firstOfBurst = true;

	while((m_sampleFifo.fill() > 0) && (m_messageQueue.countPending() == 0)) {
		SampleVector::iterator part1begin;
		SampleVector::iterator part1end;
		SampleVector::iterator part2begin;
		SampleVector::iterator part2end;

		size_t count = m_sampleFifo.readBegin(m_sampleFifo.fill(), &part1begin, &part1end, &part2begin, &part2end);

		// first part of FIFO data
		if(part1begin != part1end) {
			// handle data
			if(m_sampleSink != NULL)
				m_sampleSink->feed(part1begin, part1end, firstOfBurst);
			firstOfBurst = false;
		}
		// second part of FIFO data (used when block wraps around)
		if(part2begin != part2end) {
			// handle data
			if(m_sampleSink != NULL)
				m_sampleSink->feed(part1begin, part1end, firstOfBurst);
			firstOfBurst = false;
		}

		// adjust FIFO pointers
		m_sampleFifo.readCommit(count);
	}
}

void ThreadedSampleSink::handleMessages()
{
	Message* message;
	while((message = m_messageQueue.accept()) != NULL) {
		qDebug("CMD: %s", message->name());
		if(m_sampleSink != NULL) {
			if(!m_sampleSink->handleMessage(message))
				message->completed();
		} else {
			message->completed();
		}
	}
}

void ThreadedSampleSink::threadStarted()
{
	if(m_sampleSink != NULL)
		m_sampleSink->start();
}

void ThreadedSampleSink::threadFinished()
{
	if(m_sampleSink != NULL)
		m_sampleSink->stop();
}