summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/fake_trx.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/target/trx_toolkit/fake_trx.py')
-rwxr-xr-xsrc/target/trx_toolkit/fake_trx.py112
1 files changed, 81 insertions, 31 deletions
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 46b413aa..0f473d0a 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -30,11 +30,13 @@ import argparse
import random
import select
import sys
+import re
from app_common import ApplicationBase
from burst_fwd import BurstForwarder
from transceiver import Transceiver
from clck_gen import CLCKGen
+from trx_list import TRXList
from fake_pm import FakePM
class FakeTRX(Transceiver):
@@ -270,6 +272,34 @@ class Application(ApplicationBase):
# Configure logging
self.app_init_logging(self.argv)
+ # List of all transceivers
+ self.trx_list = TRXList()
+
+ def append_trx(self, remote_addr, base_port):
+ trx = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
+ clck_gen = self.clck_gen, pwr_meas = self.fake_pm)
+ self.trx_list.add_trx(trx)
+
+ def append_child_trx(self, remote_addr, base_port, child_idx):
+ # Index 0 corresponds to the first transceiver
+ if child_idx is 0:
+ self.append_trx(remote_addr, base_port)
+ return
+
+ # Find 'parent' transceiver for a new child
+ trx_parent = self.trx_list.find_trx(remote_addr, base_port)
+ if trx_parent is None:
+ raise IndexError("Couldn't find parent transceiver "
+ "for '%s:%d/%d'" % (remote_addr, base_port, child_idx))
+
+ # Allocate a new child
+ trx_child = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
+ child_idx = child_idx, pwr_meas = self.fake_pm)
+ self.trx_list.add_trx(trx_child)
+
+ # Link a new 'child' with its 'parent'
+ trx_parent.child_trx_list.add_trx(trx_child)
+
def run(self):
# Init shared clock generator
self.clck_gen = CLCKGen([])
@@ -280,49 +310,44 @@ class Application(ApplicationBase):
self.fake_pm = FakePM(-120, -105, -75, -50)
# Init TRX instance for BTS
- self.bts_trx = FakeTRX(self.argv.trx_bind_addr,
- self.argv.bts_addr, self.argv.bts_base_port,
- clck_gen = self.clck_gen)
+ self.append_trx(self.argv.bts_addr, self.argv.bts_base_port)
# Init TRX instance for BB
- self.bb_trx = FakeTRX(self.argv.trx_bind_addr,
- self.argv.bb_addr, self.argv.bb_base_port,
- pwr_meas = self.fake_pm)
+ self.append_trx(self.argv.bb_addr, self.argv.bb_base_port)
+
+ # Additional transceivers (optional)
+ if self.argv.trx_list is not None:
+ for trx_def in self.argv.trx_list:
+ (addr, port, idx) = trx_def
+ self.append_child_trx(addr, port, idx)
# Burst forwarding between transceivers
- self.burst_fwd = BurstForwarder()
- self.burst_fwd.add_trx(self.bts_trx)
- self.burst_fwd.add_trx(self.bb_trx)
+ self.burst_fwd = BurstForwarder(self.trx_list)
+
+ # Compose list of to be monitored sockets
+ self.sock_list = []
+ for trx in self.trx_list:
+ self.sock_list.append(trx.ctrl_if.sock)
+ self.sock_list.append(trx.data_if.sock)
log.info("Init complete")
# Enter main loop
while True:
- socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock,
- self.bts_trx.data_if.sock, self.bb_trx.data_if.sock]
-
# Wait until we get any data on any socket
- r_event, w_event, x_event = select.select(socks, [], [])
-
- # Downlink: BTS -> BB
- if self.bts_trx.data_if.sock in r_event:
- msg = self.bts_trx.recv_data_msg()
- if msg is not None:
- self.burst_fwd.forward_msg(self.bts_trx, msg)
+ r_event, _, _ = select.select(self.sock_list, [], [])
- # Uplink: BB -> BTS
- if self.bb_trx.data_if.sock in r_event:
- msg = self.bb_trx.recv_data_msg()
- if msg is not None:
- self.burst_fwd.forward_msg(self.bb_trx, msg)
+ # Iterate over all transceivers
+ for trx in self.trx_list:
+ # DATA interface
+ if trx.data_if.sock in r_event:
+ msg = trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(trx, msg)
- # CTRL commands from BTS
- if self.bts_trx.ctrl_if.sock in r_event:
- self.bts_trx.ctrl_if.handle_rx()
-
- # CTRL commands from BB
- if self.bb_trx.ctrl_if.sock in r_event:
- self.bb_trx.ctrl_if.handle_rx()
+ # CTRL interface
+ if trx.ctrl_if.sock in r_event:
+ trx.ctrl_if.handle_rx()
def shutdown(self):
log.info("Shutting down...")
@@ -330,6 +355,25 @@ class Application(ApplicationBase):
# Stop clock generator
self.clck_gen.stop()
+ # Parses a TRX definition of the following
+ # format: REMOTE_ADDR:BIND_PORT[/TRX_NUM]
+ # e.g. [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:5700/5
+ # e.g. 127.0.0.1:5700 or 127.0.0.1:5700/1
+ @staticmethod
+ def trx_def(val):
+ try:
+ result = re.match("(.+):([0-9]+)(\/[0-9]+)?", val)
+ (addr, port, idx) = result.groups()
+ except:
+ raise argparse.ArgumentTypeError("Invalid TRX definition: %s" % val)
+
+ if idx is not None:
+ idx = int(idx[1:])
+ else:
+ idx = 0
+
+ return (addr, int(port), idx)
+
def parse_argv(self):
parser = argparse.ArgumentParser(prog = "fake_trx",
description = "Virtual Um-interface (fake transceiver)")
@@ -354,6 +398,12 @@ class Application(ApplicationBase):
dest = "bb_base_port", type = int, default = 6700,
help = "Set BB base port number (default %(default)s)")
+ mtrx_group = parser.add_argument_group("Additional transceivers")
+ mtrx_group.add_argument("--trx",
+ metavar = "REMOTE_ADDR:BASE_PORT[/TRX_NUM]",
+ dest = "trx_list", type = self.trx_def, action = "append",
+ help = "Add a transceiver for BTS or MS (e.g. 127.0.0.1:5703)")
+
argv = parser.parse_args()
# Make sure there is no overlap between ports