pespin has uploaded this change for review. (
https://gerrit.osmocom.org/c/erlang/osmo_ss7/+/36095?usp=email )
Change subject: ipa_proto.erl: Implement TCP/IPA reassembly
......................................................................
ipa_proto.erl: Implement TCP/IPA reassembly
Related: OS#6377
Change-Id: I322e6ab9368ad66be66a9e8d113575f51f9c91c3
---
M src/ipa_proto.erl
1 file changed, 42 insertions(+), 22 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/erlang/osmo_ss7 refs/changes/95/36095/1
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl
index fcd594b..b0c2647 100644
--- a/src/ipa_proto.erl
+++ b/src/ipa_proto.erl
@@ -130,11 +130,18 @@
io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
% split an incoming IPA message and split it into Length/StreamID/Payload
+split_ipa_msg(DataBin) when byte_size(DataBin) < 3 ->
+ {need_more_data, DataBin};
split_ipa_msg(DataBin) ->
- % FIXME: This will throw an exception if DataBin doesn't contain all payload
- <<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary,
Trailer/binary>> = DataBin,
- io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
- {StreamID, Payload, Trailer}.
+ <<IPALen:16/big-unsigned-integer, StreamID:8, DataRemainBin/binary>> =
DataBin,
+ DataRemainLen = byte_size(DataRemainBin),
+ case DataRemainLen of
+ _ when DataRemainLen < IPALen -> {need_more_data, DataBin};
+ _ when DataRemainLen >= IPALen ->
+ <<Payload:IPALen/binary, Trailer/binary>> = DataRemainBin,
+ io:format("Stream ~p, ~p bytes~n", [StreamID, IPALen]),
+ {ok, StreamID, Payload, Trailer}
+ end.
% deliver an incoming message to the process that is registered for the socket/stream_id
deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
@@ -177,19 +184,22 @@
% process (split + deliver) an incoming IPA message
process_rx_ipa_msg(_S, _StreamMap, _, <<>>) ->
- ok;
+ {ok, <<>>};
process_rx_ipa_msg(S, StreamMap, CcmOptions, Data) ->
- {StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
- case StreamID of
- ?IPAC_PROTO_CCM ->
- process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
- ?IPAC_PROTO_OSMO ->
- <<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
- deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
- _ ->
- deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
- end,
- process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer).
+ case split_ipa_msg(Data) of
+ {need_more_data} -> {ok, Data};
+ {ok, StreamID, PayloadBin, Trailer} ->
+ case StreamID of
+ ?IPAC_PROTO_CCM ->
+ process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
+ ?IPAC_PROTO_OSMO ->
+ <<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
+ deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
+ _ ->
+ deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
+ end,
+ process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer)
+ end.
send_close_signal([]) ->
ok;
@@ -246,9 +256,9 @@
StreamMap = ets:new(stream_map, [set]),
ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(),
streamTbl=StreamMap}),
CallingPid ! {ipa_init_sock_done, Socket},
- loop(Socket, StreamMap, #ipa_ccm_options{}).
+ loop(Socket, StreamMap, #ipa_ccm_options{}, <<>>).
-loop(S, StreamMap, CcmOptions) ->
+loop(S, StreamMap, CcmOptions, RxPendingData) ->
receive
{request, From, Request} ->
case ipa_proto:request(Request, CcmOptions) of
@@ -260,15 +270,15 @@
Reply = EmbeddedReply
end,
ipa_proto:reply(From, Reply),
- ipa_proto:loop(S, StreamMap, NextCcmOptions);
+ ipa_proto:loop(S, StreamMap, NextCcmOptions, RxPendingData);
{ipa_send, S, StreamId, Data} ->
send(S, StreamId, Data),
- ipa_proto:loop(S, StreamMap, CcmOptions);
+ ipa_proto:loop(S, StreamMap, CcmOptions, RxPendingData);
{tcp, S, Data} ->
% process incoming IPA message and mark socket active once more
- ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, Data),
+ {ok, NewRxPendingData} = ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions,
<<RxPendingData/binary, Data/binary>>),
inet:setopts(S, [{active, once}]),
- ipa_proto:loop(S, StreamMap, CcmOptions);
+ ipa_proto:loop(S, StreamMap, CcmOptions, NewRxPendingData);
{tcp_closed, S} ->
io:format("Socket ~w closed [~w]~n", [S,self()]),
ipa_proto:process_tcp_closed(S, StreamMap),
--
To view, visit
https://gerrit.osmocom.org/c/erlang/osmo_ss7/+/36095?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings
Gerrit-Project: erlang/osmo_ss7
Gerrit-Branch: master
Gerrit-Change-Id: I322e6ab9368ad66be66a9e8d113575f51f9c91c3
Gerrit-Change-Number: 36095
Gerrit-PatchSet: 1
Gerrit-Owner: pespin <pespin(a)sysmocom.de>
Gerrit-MessageType: newchange