summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@gnumonks.org>2019-08-11 20:31:41 +0200
committerHarald Welte <laforge@osmocom.org>2020-08-25 09:07:20 +0200
commit3b95a7cfc90984ad5026c49b39b08c2d08fd15e9 (patch)
tree49d9d8b431dbe49857f4b368b339eacaf641719c
parentd0f391ef6039839c2a874ee560af3c5a055c64f6 (diff)
ipa_proto: Add notion of 'codecs'
A codec for a given Stream Identifier can be registered with encode and decode functions. This codec transcodes from the binary payload of messages within that stream identifier and some abstract representation. Any received messages will be passed through decode, while any to-be-transmitted messages will be passed through encode. Change-Id: I8eaf888402545a1a871df9ae3dfbce690729dd03
-rw-r--r--src/ipa_proto.erl57
1 files changed, 44 insertions, 13 deletions
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl
index c191a74..9a3f3d0 100644
--- a/src/ipa_proto.erl
+++ b/src/ipa_proto.erl
@@ -37,11 +37,16 @@
-export([register_socket/1, register_stream/3, unregister_stream/2,
send/3, connect/3, connect/4, listen_accept_handle/2,
- start_listen/3, controlling_process/3]).
+ start_listen/3, controlling_process/3, register_codec/3]).
--record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
+-type stream_id() :: integer() | {osmo, integer()}.
+-record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
+-record(ipa_codec, {streamId :: stream_id(),
+ encodeFn :: fun(),
+ decodeFn :: fun()
+ }).
% register a TCP socket with this IPA protocol implementation
register_socket(Socket) ->
@@ -124,15 +129,43 @@ split_ipa_msg(DataBin) ->
% deliver an incoming message to the process that is registered for the socket/stream_id
deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
+ DataDec = try_decode(StreamID, DataBin),
case ets:lookup(StreamMap, {Socket, StreamID}) of
[{_,{process_id, Pid}}] ->
- Pid ! {ipa, Socket, StreamID, DataBin};
+ Pid ! {ipa, Socket, StreamID, DataDec};
[{_,{callback_fn, Fn, Args}}] ->
- Fn(Socket, StreamID, DataBin, Args);
+ Fn(Socket, StreamID, DataDec, Args);
[] ->
io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID])
end.
+% register a Codec with this IPA protocol implementation
+-spec register_codec(stream_id(), fun(), fun()) -> boolean().
+register_codec(StreamID, EncodeFn, DecodeFn) ->
+ ets:insert(ipa_codecs, #ipa_codec{streamId=StreamID, encodeFn=EncodeFn, decodeFn=DecodeFn}).
+
+-spec try_decode(stream_id(), binary()) -> any().
+try_decode(StreamID, Data) ->
+ case ets:lookup(ipa_codecs, StreamID) of
+ [IpaCodec] ->
+ Fun = IpaCodec#ipa_codec.decodeFn,
+ Fun(Data);
+ [] ->
+ Data
+ end.
+
+-spec try_encode(stream_id(), any()) -> binary().
+try_encode(_StreamID, Data) when is_binary(Data) ->
+ Data;
+try_encode(StreamID, Data) ->
+ case ets:lookup(ipa_codecs, StreamID) of
+ [IpaCodec] ->
+ Fun = IpaCodec#ipa_codec.encodeFn,
+ Fun(Data);
+ [] ->
+ Data
+ end.
+
% process (split + deliver) an incoming IPA message
process_rx_ipa_msg(_S, _StreamMap, <<>>) ->
ok;
@@ -173,10 +206,12 @@ process_tcp_closed(S, StreamMap) ->
% send a binary message through a given Socket / StreamID
send(Socket, {osmo, StreamIdExt}, DataBin) ->
- send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataBin]);
+ DataEnc = try_encode({osmo, StreamIdExt}, DataBin),
+ send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataEnc]);
send(Socket, StreamID, DataBin) ->
- Size = iolist_size(DataBin),
- gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])).
+ DataEnc = try_encode(StreamID, DataBin),
+ Size = iolist_size(DataEnc),
+ gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataEnc])).
call_sync(Pid, Request) ->
@@ -194,12 +229,8 @@ reply({From, Ref}, Reply) ->
% global module initialization
init() ->
- case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of
- ipa_sockets ->
- ok;
- _ ->
- {error, ets_new_ipa_sockets}
- end.
+ ipa_sockets = ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]),
+ ipa_codecs = ets:new(ipa_codecs, [named_table, set, public, {keypos, #ipa_codec.streamId}]).
% initialize a signle socket, create its handle process
init_sock(Socket, CallingPid) ->