aboutsummaryrefslogtreecommitdiffstats
path: root/lib/rfspace/rfspace_source_c.cc
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rfspace/rfspace_source_c.cc')
-rw-r--r--lib/rfspace/rfspace_source_c.cc156
1 files changed, 151 insertions, 5 deletions
diff --git a/lib/rfspace/rfspace_source_c.cc b/lib/rfspace/rfspace_source_c.cc
index 314dfc7..80f34df 100644
--- a/lib/rfspace/rfspace_source_c.cc
+++ b/lib/rfspace/rfspace_source_c.cc
@@ -27,6 +27,7 @@
#include "config.h"
#endif
+#ifndef USE_ASIO
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
@@ -34,6 +35,7 @@
#include <netinet/udp.h>
#include <arpa/inet.h>
#include <netdb.h>
+#endif
#include <fcntl.h>
#include <unistd.h>
@@ -52,6 +54,9 @@
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/algorithm/string.hpp>
+#ifdef USE_ASIO
+#include <boost/asio/deadline_timer.hpp>
+#endif
#include <gnuradio/io_signature.h>
@@ -59,6 +64,9 @@
#include "rfspace_source_c.h"
using namespace boost::assign;
+#ifdef USE_ASIO
+using boost::asio::deadline_timer;
+#endif
#define DEFAULT_HOST "127.0.0.1" /* We assume a running "siqs" from CuteSDR project */
#define DEFAULT_PORT 50000
@@ -94,8 +102,15 @@ rfspace_source_c::rfspace_source_c (const std::string &args)
gr::io_signature::make (MIN_IN, MAX_IN, sizeof (gr_complex)),
gr::io_signature::make (MIN_OUT, MAX_OUT, sizeof (gr_complex))),
_radio(RADIO_UNKNOWN),
+#ifdef USE_ASIO
+ _io_service(),
+ _resolver(_io_service),
+ _t(_io_service),
+ _u(_io_service),
+#else
_tcp(-1),
_udp(-1),
+#endif
_usb(-1),
_running(false),
_keep_running(false),
@@ -224,6 +239,30 @@ rfspace_source_c::rfspace_source_c (const std::string &args)
/* SDR-IP 4.4.4 Data Output UDP IP and Port Address */
/* NETSDR 4.4.3 Data Output UDP IP and Port Address */
+#ifdef USE_ASIO
+
+ tcp::resolver::query query(tcp::v4(), host.c_str(), port_str.c_str());
+ tcp::resolver::iterator iterator = _resolver.resolve(query);
+
+ boost::system::error_code ec;
+
+ boost::asio::connect(_t, iterator, ec);
+ if ( ec )
+ throw std::runtime_error(ec.message() + " (" + host + ":" + port_str + ")");
+
+ _u.open(udp::v4(), ec);
+ if ( ec )
+ throw std::runtime_error(ec.message());
+
+ _u.bind(udp::endpoint(udp::v4(), DEFAULT_PORT), ec);
+ if ( ec )
+ throw std::runtime_error(ec.message());
+
+ _u.set_option(udp::socket::reuse_address(true));
+ _t.set_option(udp::socket::reuse_address(true));
+
+#else
+
if ( (_tcp = socket(AF_INET, SOCK_STREAM, 0) ) < 0)
{
throw std::runtime_error("Could not create TCP socket");
@@ -300,6 +339,8 @@ rfspace_source_c::rfspace_source_c (const std::string &args)
throw std::runtime_error("Bind of UDP socket failed: " + std::string(strerror(errno)));
}
+#endif
+
}
/* Wait 10 ms before sending queries to device (required for networked radios). */
@@ -465,8 +506,10 @@ rfspace_source_c::rfspace_source_c (const std::string &args)
*/
rfspace_source_c::~rfspace_source_c ()
{
+#ifndef USE_ASIO
close(_tcp);
close(_udp);
+#endif
if ( RFSPACE_SDR_IQ == _radio )
{
@@ -547,7 +590,7 @@ bool rfspace_source_c::transaction( const unsigned char *cmd, size_t size,
if ( write(_usb, cmd, size) != (int)size )
return false;
- std::unique_lock<std::mutex> lock(_resp_lock);
+ boost::unique_lock<boost::mutex> lock(_resp_lock);
_resp_avail.wait(lock);
rx_bytes = _resp.size();
@@ -555,8 +598,13 @@ bool rfspace_source_c::transaction( const unsigned char *cmd, size_t size,
}
else
{
- std::lock_guard<std::mutex> lock(_tcp_lock);
+ boost::mutex::scoped_lock lock(_tcp_lock);
+
+#ifdef USE_ASIO
+ _t.write_some( boost::asio::buffer(cmd, size) );
+ rx_bytes = _t.read_some( boost::asio::buffer(data, sizeof(data)) );
+#else
if ( write(_tcp, cmd, size) != (int)size )
return false;
@@ -576,6 +624,7 @@ bool rfspace_source_c::transaction( const unsigned char *cmd, size_t size,
return false;
rx_bytes = 2 + length; /* header + payload */
+#endif
}
response.resize( rx_bytes );
@@ -780,7 +829,7 @@ int rfspace_source_c::work( int noutput_items,
{
gr_complex *out = (gr_complex *)output_items[0];
- std::unique_lock<std::mutex> lock(_fifo_lock);
+ boost::unique_lock<boost::mutex> lock(_fifo_lock);
/* Wait until we have the requested number of samples */
int n_samples_avail = _fifo->size();
@@ -803,6 +852,10 @@ int rfspace_source_c::work( int noutput_items,
return noutput_items;
}
+#ifdef USE_ASIO
+ udp::endpoint ep;
+ size_t rx_bytes = _u.receive_from( boost::asio::buffer(data, sizeof(data)), ep );
+#else
struct sockaddr_in sa_in; /* remote address */
socklen_t addrlen = sizeof(sa_in); /* length of addresses */
ssize_t rx_bytes = recvfrom(_udp, data, sizeof(data), 0, (struct sockaddr *)&sa_in, &addrlen);
@@ -811,6 +864,7 @@ int rfspace_source_c::work( int noutput_items,
std::cerr << "recvfrom returned " << rx_bytes << std::endl;
return WORK_DONE;
}
+#endif
#define HEADER_SIZE 2
#define SEQNUM_SIZE 2
@@ -838,7 +892,11 @@ int rfspace_source_c::work( int noutput_items,
if ( diff > 1 )
{
std::cerr << "Lost " << diff << " packets from "
+#ifdef USE_ASIO
+ << ep
+#else
<< inet_ntoa(sa_in.sin_addr) << ":" << ntohs(sa_in.sin_port)
+#endif
<< std::endl;
}
@@ -919,11 +977,48 @@ typedef struct
uint16_t port;
} unit_t;
+#ifdef USE_ASIO
+static void handle_receive( const boost::system::error_code& ec,
+ std::size_t length,
+ boost::system::error_code* out_ec,
+ std::size_t* out_length )
+{
+ *out_ec = ec;
+ *out_length = length;
+}
+
+static void handle_timer( const boost::system::error_code& ec,
+ boost::system::error_code* out_ec )
+{
+ *out_ec = boost::asio::error::timed_out;
+}
+#endif
static std::vector < unit_t > discover_netsdr()
{
std::vector < unit_t > units;
+#ifdef USE_ASIO
+ boost::system::error_code ec;
+ boost::asio::io_service ios;
+ udp::socket socket(ios);
+ deadline_timer timer(ios);
+
+ timer.expires_at(boost::posix_time::pos_infin);
+
+ socket.open(udp::v4(), ec);
+
+ if ( ec )
+ return units;
+
+ socket.bind(udp::endpoint(udp::v4(), DISCOVER_CLIENT_PORT), ec);
+
+ if ( ec )
+ return units;
+
+ socket.set_option(udp::socket::reuse_address(true));
+ socket.set_option(boost::asio::socket_base::broadcast(true));
+#else
int sock;
if ( (sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
@@ -963,6 +1058,7 @@ static std::vector < unit_t > discover_netsdr()
close(sock);
return units;
}
+#endif
discover_common_msg_t tx_msg;
memset( (void *)&tx_msg, 0, sizeof(discover_common_msg_t) );
@@ -971,18 +1067,64 @@ static std::vector < unit_t > discover_netsdr()
tx_msg.key[0] = KEY0;
tx_msg.key[1] = KEY1;
tx_msg.op = MSG_REQ;
+#ifdef USE_ASIO
+ udp::endpoint ep(boost::asio::ip::address_v4::broadcast(), DISCOVER_SERVER_PORT);
+ socket.send_to(boost::asio::buffer(&tx_msg, sizeof(tx_msg)), ep);
+#else
sendto(sock, &tx_msg, sizeof(tx_msg), 0, (struct sockaddr *)&peer_sa, sizeof(peer_sa));
+#endif
while ( true )
{
std::size_t rx_bytes = 0;
unsigned char data[1024*2];
+#ifdef USE_ASIO
+ // Set up the variables that receive the result of the asynchronous
+ // operation. The error code is set to would_block to signal that the
+ // operation is incomplete. Asio guarantees that its asynchronous
+ // operations will never fail with would_block, so any other value in
+ // ec indicates completion.
+ ec = boost::asio::error::would_block;
+
+ // Start the asynchronous receive operation. The handle_receive function
+ // used as a callback will update the ec and rx_bytes variables.
+ socket.async_receive( boost::asio::buffer(data, sizeof(data)),
+ boost::bind(handle_receive, _1, _2, &ec, &rx_bytes) );
+
+ // Set a deadline for the asynchronous operation.
+ timer.expires_from_now( boost::posix_time::milliseconds(10) );
+
+ // Start an asynchronous wait on the timer. The handle_timer function
+ // used as a callback will update the ec variable.
+ timer.async_wait( boost::bind(handle_timer, _1, &ec) );
+
+ // Reset the io_service in preparation for a subsequent run_one() invocation.
+ ios.reset();
+
+ // Block until at least one asynchronous operation has completed.
+ do ios.run_one(); while ( ec == boost::asio::error::would_block );
+
+ if ( boost::asio::error::timed_out == ec ) /* timer was first to complete */
+ {
+ // Please note that cancel() has portability issues on some versions of
+ // Microsoft Windows, and it may be necessary to use close() instead.
+ // Consult the documentation for cancel() for further information.
+ socket.cancel();
+
+ break;
+ }
+ else /* socket was first to complete */
+ {
+ timer.cancel();
+ }
+#else
socklen_t addrlen = sizeof(peer_sa); /* length of addresses */
int nbytes = recvfrom(sock, data, sizeof(data), 0, (struct sockaddr *)&peer_sa, &addrlen);
if ( nbytes <= 0 )
break;
rx_bytes = nbytes;
+#endif
if ( rx_bytes >= sizeof(discover_common_msg_t) )
{
@@ -1009,7 +1151,11 @@ static std::vector < unit_t > discover_netsdr()
}
}
}
+#ifdef USE_ASIO
+ socket.close(ec);
+#else
close(sock);
+#endif
return units;
}
@@ -1150,7 +1296,7 @@ std::vector<std::string> rfspace_source_c::get_devices( bool fake )
std::vector < unit_t > units = discover_netsdr();
- for (unit_t u : units)
+ BOOST_FOREACH( unit_t u, units )
{
// std::cerr << u.name << " " << u.sn << " " << u.addr << ":" << u.port
// << std::endl;
@@ -1164,7 +1310,7 @@ std::vector<std::string> rfspace_source_c::get_devices( bool fake )
units = discover_sdr_iq();
- for (unit_t u : units)
+ BOOST_FOREACH( unit_t u, units )
{
// std::cerr << u.name << " " << u.sn << " " << u.addr << ":" << u.port
// << std::endl;