summaryrefslogtreecommitdiffstats
path: root/src/ipa_proto.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipa_proto.erl')
-rw-r--r--src/ipa_proto.erl57
1 files changed, 44 insertions, 13 deletions
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl
index c191a74..9a3f3d0 100644
--- a/src/ipa_proto.erl
+++ b/src/ipa_proto.erl
@@ -37,11 +37,16 @@
-export([register_socket/1, register_stream/3, unregister_stream/2,
send/3, connect/3, connect/4, listen_accept_handle/2,
- start_listen/3, controlling_process/3]).
+ start_listen/3, controlling_process/3, register_codec/3]).
--record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
+-type stream_id() :: integer() | {osmo, integer()}.
+-record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
+-record(ipa_codec, {streamId :: stream_id(),
+ encodeFn :: fun(),
+ decodeFn :: fun()
+ }).
% register a TCP socket with this IPA protocol implementation
register_socket(Socket) ->
@@ -124,15 +129,43 @@ split_ipa_msg(DataBin) ->
% deliver an incoming message to the process that is registered for the socket/stream_id
deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
+ DataDec = try_decode(StreamID, DataBin),
case ets:lookup(StreamMap, {Socket, StreamID}) of
[{_,{process_id, Pid}}] ->
- Pid ! {ipa, Socket, StreamID, DataBin};
+ Pid ! {ipa, Socket, StreamID, DataDec};
[{_,{callback_fn, Fn, Args}}] ->
- Fn(Socket, StreamID, DataBin, Args);
+ Fn(Socket, StreamID, DataDec, Args);
[] ->
io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID])
end.
+% register a Codec with this IPA protocol implementation
+-spec register_codec(stream_id(), fun(), fun()) -> boolean().
+register_codec(StreamID, EncodeFn, DecodeFn) ->
+ ets:insert(ipa_codecs, #ipa_codec{streamId=StreamID, encodeFn=EncodeFn, decodeFn=DecodeFn}).
+
+-spec try_decode(stream_id(), binary()) -> any().
+try_decode(StreamID, Data) ->
+ case ets:lookup(ipa_codecs, StreamID) of
+ [IpaCodec] ->
+ Fun = IpaCodec#ipa_codec.decodeFn,
+ Fun(Data);
+ [] ->
+ Data
+ end.
+
+-spec try_encode(stream_id(), any()) -> binary().
+try_encode(_StreamID, Data) when is_binary(Data) ->
+ Data;
+try_encode(StreamID, Data) ->
+ case ets:lookup(ipa_codecs, StreamID) of
+ [IpaCodec] ->
+ Fun = IpaCodec#ipa_codec.encodeFn,
+ Fun(Data);
+ [] ->
+ Data
+ end.
+
% process (split + deliver) an incoming IPA message
process_rx_ipa_msg(_S, _StreamMap, <<>>) ->
ok;
@@ -173,10 +206,12 @@ process_tcp_closed(S, StreamMap) ->
% send a binary message through a given Socket / StreamID
send(Socket, {osmo, StreamIdExt}, DataBin) ->
- send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataBin]);
+ DataEnc = try_encode({osmo, StreamIdExt}, DataBin),
+ send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataEnc]);
send(Socket, StreamID, DataBin) ->
- Size = iolist_size(DataBin),
- gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])).
+ DataEnc = try_encode(StreamID, DataBin),
+ Size = iolist_size(DataEnc),
+ gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataEnc])).
call_sync(Pid, Request) ->
@@ -194,12 +229,8 @@ reply({From, Ref}, Reply) ->
% global module initialization
init() ->
- case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of
- ipa_sockets ->
- ok;
- _ ->
- {error, ets_new_ipa_sockets}
- end.
+ ipa_sockets = ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]),
+ ipa_codecs = ets:new(ipa_codecs, [named_table, set, public, {keypos, #ipa_codec.streamId}]).
% initialize a signle socket, create its handle process
init_sock(Socket, CallingPid) ->