summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPau Espin Pedrol <pespin@sysmocom.de>2024-02-27 21:15:36 +0100
committerPau Espin Pedrol <pespin@sysmocom.de>2024-02-28 18:38:20 +0100
commit2286c1b8738d715950026650bf53f19a69d6ed0e (patch)
treeea2c8a4d7ce5ff2acd55b6bd1408d1b54220a76c
parent414a61880466ee692ac30d0b30070bdb13e0f39b (diff)
ipa_proto.erl: Implement TCP/IPA reassemblyHEADmaster
-rw-r--r--src/ipa_proto.erl55
1 files changed, 32 insertions, 23 deletions
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl
index c96d6bb..d88af7e 100644
--- a/src/ipa_proto.erl
+++ b/src/ipa_proto.erl
@@ -129,12 +129,18 @@ request({ipa_unblock, Socket}, CcmOptions) ->
Ret = inet:setopts(Socket, [{active, once}]),
io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
-% split an incoming IPA message and split it into Length/StreamID/Payload
+% split an incoming IPA message and split it into StreamID/Payload/Trailer
+split_ipa_msg(IPALen, _StreamID, DataRemainBin) when byte_size(DataRemainBin) < IPALen ->
+ need_more_data;
+split_ipa_msg(IPALen, StreamID, DataRemainBin) ->
+ <<Payload:IPALen/binary, Trailer/binary>> = DataRemainBin,
+ io:format("Stream ~p, ~p bytes~n", [StreamID, IPALen]),
+ {ok, StreamID, Payload, Trailer}.
+split_ipa_msg(DataBin) when byte_size(DataBin) < 3 ->
+ need_more_data;
split_ipa_msg(DataBin) ->
- % FIXME: This will throw an exception if DataBin doesn't contain all payload
- <<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary, Trailer/binary>> = DataBin,
- io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
- {StreamID, Payload, Trailer}.
+ <<IPALen:16/big-unsigned-integer, StreamID:8, DataRemainBin/binary>> = DataBin,
+ split_ipa_msg(IPALen, StreamID, DataRemainBin).
% deliver an incoming message to the process that is registered for the socket/stream_id
deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
@@ -177,19 +183,22 @@ try_encode(StreamID, Data) ->
% process (split + deliver) an incoming IPA message
process_rx_ipa_msg(_S, _StreamMap, _, <<>>) ->
- ok;
+ {ok, <<>>};
process_rx_ipa_msg(S, StreamMap, CcmOptions, Data) ->
- {StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
- case StreamID of
- ?IPAC_PROTO_CCM ->
- process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
- ?IPAC_PROTO_OSMO ->
- <<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
- deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
- _ ->
- deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
- end,
- process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer).
+ case split_ipa_msg(Data) of
+ need_more_data -> {ok, Data};
+ {ok, StreamID, PayloadBin, Trailer} ->
+ case StreamID of
+ ?IPAC_PROTO_CCM ->
+ process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
+ ?IPAC_PROTO_OSMO ->
+ <<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
+ deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
+ _ ->
+ deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
+ end,
+ process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer)
+ end.
send_close_signal([]) ->
ok;
@@ -246,9 +255,9 @@ init_sock(Socket, CallingPid) ->
StreamMap = ets:new(stream_map, [set]),
ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}),
CallingPid ! {ipa_init_sock_done, Socket},
- loop(Socket, StreamMap, #ipa_ccm_options{}).
+ loop(Socket, StreamMap, #ipa_ccm_options{}, <<>>).
-loop(S, StreamMap, CcmOptions) ->
+loop(S, StreamMap, CcmOptions, RxPendingData) ->
receive
{request, From, Request} ->
case ipa_proto:request(Request, CcmOptions) of
@@ -260,15 +269,15 @@ loop(S, StreamMap, CcmOptions) ->
Reply = EmbeddedReply
end,
ipa_proto:reply(From, Reply),
- ipa_proto:loop(S, StreamMap, NextCcmOptions);
+ ipa_proto:loop(S, StreamMap, NextCcmOptions, RxPendingData);
{ipa_send, S, StreamId, Data} ->
send(S, StreamId, Data),
- ipa_proto:loop(S, StreamMap, CcmOptions);
+ ipa_proto:loop(S, StreamMap, CcmOptions, RxPendingData);
{tcp, S, Data} ->
% process incoming IPA message and mark socket active once more
- ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, Data),
+ {ok, NewRxPendingData} = ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, <<RxPendingData/binary, Data/binary>>),
inet:setopts(S, [{active, once}]),
- ipa_proto:loop(S, StreamMap, CcmOptions);
+ ipa_proto:loop(S, StreamMap, CcmOptions, NewRxPendingData);
{tcp_closed, S} ->
io:format("Socket ~w closed [~w]~n", [S,self()]),
ipa_proto:process_tcp_closed(S, StreamMap),