diff options
-rw-r--r-- | src/ipa_proto.erl | 57 |
1 files changed, 44 insertions, 13 deletions
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl index c191a74..9a3f3d0 100644 --- a/src/ipa_proto.erl +++ b/src/ipa_proto.erl @@ -37,11 +37,16 @@ -export([register_socket/1, register_stream/3, unregister_stream/2, send/3, connect/3, connect/4, listen_accept_handle/2, - start_listen/3, controlling_process/3]). + start_listen/3, controlling_process/3, register_codec/3]). --record(ipa_socket, {socket, ipaPid, streamTbl, listenType}). +-type stream_id() :: integer() | {osmo, integer()}. +-record(ipa_socket, {socket, ipaPid, streamTbl, listenType}). +-record(ipa_codec, {streamId :: stream_id(), + encodeFn :: fun(), + decodeFn :: fun() + }). % register a TCP socket with this IPA protocol implementation register_socket(Socket) -> @@ -124,15 +129,43 @@ split_ipa_msg(DataBin) -> % deliver an incoming message to the process that is registered for the socket/stream_id deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) -> + DataDec = try_decode(StreamID, DataBin), case ets:lookup(StreamMap, {Socket, StreamID}) of [{_,{process_id, Pid}}] -> - Pid ! {ipa, Socket, StreamID, DataBin}; + Pid ! {ipa, Socket, StreamID, DataDec}; [{_,{callback_fn, Fn, Args}}] -> - Fn(Socket, StreamID, DataBin, Args); + Fn(Socket, StreamID, DataDec, Args); [] -> io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID]) end. +% register a Codec with this IPA protocol implementation +-spec register_codec(stream_id(), fun(), fun()) -> boolean(). +register_codec(StreamID, EncodeFn, DecodeFn) -> + ets:insert(ipa_codecs, #ipa_codec{streamId=StreamID, encodeFn=EncodeFn, decodeFn=DecodeFn}). + +-spec try_decode(stream_id(), binary()) -> any(). +try_decode(StreamID, Data) -> + case ets:lookup(ipa_codecs, StreamID) of + [IpaCodec] -> + Fun = IpaCodec#ipa_codec.decodeFn, + Fun(Data); + [] -> + Data + end. + +-spec try_encode(stream_id(), any()) -> binary(). +try_encode(_StreamID, Data) when is_binary(Data) -> + Data; +try_encode(StreamID, Data) -> + case ets:lookup(ipa_codecs, StreamID) of + [IpaCodec] -> + Fun = IpaCodec#ipa_codec.encodeFn, + Fun(Data); + [] -> + Data + end. + % process (split + deliver) an incoming IPA message process_rx_ipa_msg(_S, _StreamMap, <<>>) -> ok; @@ -173,10 +206,12 @@ process_tcp_closed(S, StreamMap) -> % send a binary message through a given Socket / StreamID send(Socket, {osmo, StreamIdExt}, DataBin) -> - send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataBin]); + DataEnc = try_encode({osmo, StreamIdExt}, DataBin), + send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataEnc]); send(Socket, StreamID, DataBin) -> - Size = iolist_size(DataBin), - gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])). + DataEnc = try_encode(StreamID, DataBin), + Size = iolist_size(DataEnc), + gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataEnc])). call_sync(Pid, Request) -> @@ -194,12 +229,8 @@ reply({From, Ref}, Reply) -> % global module initialization init() -> - case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of - ipa_sockets -> - ok; - _ -> - {error, ets_new_ipa_sockets} - end. + ipa_sockets = ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]), + ipa_codecs = ets:new(ipa_codecs, [named_table, set, public, {keypos, #ipa_codec.streamId}]). % initialize a signle socket, create its handle process init_sock(Socket, CallingPid) -> |