laforge has submitted this change. (
https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/38683?usp=email )
Change subject: s1ap_proxy: turn this module into a gen_server
......................................................................
s1ap_proxy: turn this module into a gen_server
This patch makes the s1ap_proxy module self-contained, so API users
no longer need to initialize, maintain, or deinitialize its internal
state, and also no longer need to trap 'EXIT' signals from the child
erab_fsm processes. These improvements in maintainability and ease
of use come at the cost of an additional process per SCTP
connection, which is manageable.
Change-Id: Id6e4c7ee29ae31edca658e1293601d38e5f43e63
---
M src/s1ap_proxy.erl
M src/sctp_proxy.erl
M test/s1ap_proxy_test.erl
3 files changed, 137 insertions(+), 120 deletions(-)
Approvals:
pespin: Looks good to me, but someone else must approve
laforge: Looks good to me, approved
Jenkins Builder: Verified
diff --git a/src/s1ap_proxy.erl b/src/s1ap_proxy.erl
index 62c3bf2..4452269 100644
--- a/src/s1ap_proxy.erl
+++ b/src/s1ap_proxy.erl
@@ -33,11 +33,16 @@
%% that of the covered work.
-module(s1ap_proxy).
+-behaviour(gen_server).
--export([init/0,
- deinit/1,
+-export([init/1,
+ handle_info/2,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2]).
+-export([start_link/0,
process_pdu/2,
- handle_exit/2]).
+ shutdown/1]).
-include_lib("kernel/include/logger.hrl").
@@ -71,26 +76,95 @@
-export_type([proxy_action/0]).
+
%% ------------------------------------------------------------------
%% public API
%% ------------------------------------------------------------------
-%% Initialize per-connection data
--spec init() -> proxy_state().
-init() ->
- #proxy_state{erabs = dict:new()}.
+-spec start_link() -> gen_server:start_ret().
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
-%% De-initialize per-connection data
--spec deinit(proxy_state()) -> ok.
-deinit(_S) ->
+-type process_pdu_result() :: {proxy_action(), binary()}.
+-spec process_pdu(pid(), binary()) -> process_pdu_result().
+process_pdu(Pid, PDU) ->
+ gen_server:call(Pid, {?FUNCTION_NAME, PDU}).
+
+
+-spec shutdown(pid()) -> ok.
+shutdown(Pid) ->
+ gen_server:stop(Pid).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([]) ->
+ process_flag(trap_exit, true),
+ {ok, #proxy_state{erabs = dict:new()}}.
+
+
+handle_call({process_pdu, OrigData}, _From,
+ #proxy_state{} = S0) ->
+ {Reply, S1} = handle_pdu_bin(OrigData, S0),
+ {reply, Reply, S1};
+
+handle_call(Info, From,
+ #proxy_state{} = S) ->
+ ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]),
+ {reply, {error, not_implemented}, S}.
+
+
+handle_cast(Info, #proxy_state{} = S) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, S}.
+
+
+handle_info({'EXIT', Pid, Reason},
+ #proxy_state{erabs = ERABs} = S) ->
+ ?LOG_DEBUG("Child process ~p terminated with reason ~p", [Pid, Reason]),
+ Fun = fun(_Key, Val) -> Val =/= Pid end,
+ {noreply, S#proxy_state{erabs = dict:filter(Fun, ERABs)}};
+
+handle_info(Info, #proxy_state{} = S) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, S}.
+
+
+terminate(Reason, #proxy_state{}) ->
+ ?LOG_NOTICE("Terminating, reason ~p", [Reason]),
ok.
-%% Process an S1AP PDU
--type process_pdu_result() :: {{proxy_action(), binary()}, proxy_state()}.
--spec process_pdu(binary(), proxy_state()) -> process_pdu_result().
-process_pdu(OrigData, S0) ->
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+-spec erab_uid(erab_id(), proxy_state()) -> erab_uid().
+erab_uid(ERABId, #proxy_state{mme_ue_id = MmeUeId,
+ enb_ue_id = EnbUeId}) ->
+ {MmeUeId, EnbUeId, ERABId}.
+
+
+%% Encode an S1AP PDU
+-spec encode_pdu(s1ap_pdu()) -> {ok, binary()} |
+ {error, {asn1, tuple()}}.
+encode_pdu(Pdu) ->
+ 'S1AP-PDU-Descriptions':encode('S1AP-PDU', Pdu).
+
+
+%% Decode an S1AP PDU
+-spec decode_pdu(binary()) -> {ok, s1ap_pdu()} |
+ {error, {asn1, tuple()}}.
+decode_pdu(Data) ->
+ 'S1AP-PDU-Descriptions':decode('S1AP-PDU', Data).
+
+
+%% Process an S1AP PDU (binary)
+-spec handle_pdu_bin(binary(), proxy_state()) -> {process_pdu_result(),
proxy_state()}.
+handle_pdu_bin(OrigData, S0) ->
s1gw_metrics:ctr_inc(?S1GW_CTR_S1AP_PROXY_IN_PKT_ALL),
try decode_pdu(OrigData) of
{ok, PDU} ->
@@ -127,38 +201,7 @@
end.
-%% Handle an exit event
--spec handle_exit(pid(), proxy_state()) -> proxy_state().
-handle_exit(Pid, #proxy_state{erabs = ERABs} = S) ->
- Fun = fun(_Key, Val) -> Val =/= Pid end,
- S#proxy_state{erabs = dict:filter(Fun, ERABs)}.
-
-
-%% ------------------------------------------------------------------
-%% private API
-%% ------------------------------------------------------------------
-
--spec erab_uid(erab_id(), proxy_state()) -> erab_uid().
-erab_uid(ERABId, #proxy_state{mme_ue_id = MmeUeId,
- enb_ue_id = EnbUeId}) ->
- {MmeUeId, EnbUeId, ERABId}.
-
-
-%% Encode an S1AP PDU
--spec encode_pdu(s1ap_pdu()) -> {ok, binary()} |
- {error, {asn1, tuple()}}.
-encode_pdu(Pdu) ->
- 'S1AP-PDU-Descriptions':encode('S1AP-PDU', Pdu).
-
-
-%% Decode an S1AP PDU
--spec decode_pdu(binary()) -> {ok, s1ap_pdu()} |
- {error, {asn1, tuple()}}.
-decode_pdu(Data) ->
- 'S1AP-PDU-Descriptions':decode('S1AP-PDU', Data).
-
-
-%% Process an S1AP PDU
+%% Process an S1AP PDU (decoded)
-spec handle_pdu(s1ap_pdu(), proxy_state()) -> {{proxy_action(), s1ap_pdu()},
proxy_state()} |
{forward, proxy_state()}.
diff --git a/src/sctp_proxy.erl b/src/sctp_proxy.erl
index d0bdf08..6503a9e 100644
--- a/src/sctp_proxy.erl
+++ b/src/sctp_proxy.erl
@@ -77,13 +77,13 @@
%% ------------------------------------------------------------------
init([Aid, MmeAddr, MmePort]) ->
- process_flag(trap_exit, true),
+ {ok, Pid} = s1ap_proxy:start_link(),
{ok, connecting,
#{enb_aid => Aid,
mme_addr => MmeAddr,
mme_port => MmePort,
tx_queue => [],
- priv => s1ap_proxy:init()}}.
+ handler => Pid}}.
callback_mode() ->
@@ -145,8 +145,8 @@
%% Handle an eNB -> MME data forwarding request (forward)
connected(cast, {send_data, Data}, S0) ->
- S1 = sctp_send(Data, S0),
- {keep_state, S1};
+ sctp_send(Data, S0),
+ {keep_state, S0};
%% Handle an #sctp_assoc_change event (MME connection state)
connected(info, {sctp, _Socket, MmeAddr, MmePort,
@@ -168,24 +168,16 @@
#{sock := Sock,
enb_aid := EnbAid,
mme_aid := Aid,
- priv := Priv} = S) ->
+ handler := Pid} = S) ->
?LOG_DEBUG("MME connection (id=~p, ~p:~p) -> eNB: ~p",
[Aid, MmeAddr, MmePort, Data]),
- {Action, NewPriv} = s1ap_proxy:process_pdu(Data, Priv),
- case Action of
+ case s1ap_proxy:process_pdu(Pid, Data) of
{forward, FwdData} ->
sctp_server:send_data(EnbAid, FwdData);
{reply, ReData} ->
ok = sctp_client:send_data({Sock, Aid}, ReData)
end,
- {keep_state, S#{priv := NewPriv}};
-
-%% Handle termination events of the child processes
-connected(info, {'EXIT', Pid, Reason},
- #{priv := Priv} = S) ->
- ?LOG_DEBUG("Child process ~p terminated with reason ~p", [Pid, Reason]),
- NewPriv = s1ap_proxy:handle_exit(Pid, Priv),
- {keep_state, S#{priv := NewPriv}};
+ {keep_state, S};
%% Catch-all for other kinds of SCTP events
connected(info, {sctp, _Socket, MmeAddr, MmePort,
@@ -205,13 +197,12 @@
{ok, State, S}.
-terminate(Reason, State, S) ->
+terminate(Reason, State, #{handler := Pid} = S) ->
?LOG_NOTICE("Terminating in state ~p, reason ~p", [State, Reason]),
+ s1ap_proxy:shutdown(Pid),
case S of
#{sock := Sock,
- mme_aid := Aid,
- priv := Priv} ->
- s1ap_proxy:deinit(Priv),
+ mme_aid := Aid} ->
sctp_client:disconnect({Sock, Aid}),
gen_sctp:close(Sock);
#{sock := Sock} ->
@@ -229,25 +220,23 @@
#{sock := Sock,
enb_aid := EnbAid,
mme_aid := Aid,
- priv := Priv} = S) ->
- {Action, NewPriv} = s1ap_proxy:process_pdu(Data, Priv),
- case Action of
+ handler := Pid}) ->
+ case s1ap_proxy:process_pdu(Pid, Data) of
{forward, FwdData} ->
ok = sctp_client:send_data({Sock, Aid}, FwdData);
{reply, ReData} ->
sctp_server:send_data(EnbAid, ReData)
- end,
- S#{priv := NewPriv}.
+ end.
%% Send pending messages to the MME
sctp_send_pending(#{tx_queue := Pending} = S) ->
sctp_send_pending(lists:reverse(Pending), S).
-sctp_send_pending([Data | Pending], S0) ->
- S1 = sctp_send(Data, S0),
+sctp_send_pending([Data | Pending], S) ->
+ sctp_send(Data, S),
s1gw_metrics:gauge_dec(?S1GW_GAUGE_S1AP_PROXY_UPLINK_PACKETS_QUEUED),
- sctp_send_pending(Pending, S1);
+ sctp_send_pending(Pending, S);
sctp_send_pending([], S) ->
S#{tx_queue := []}.
diff --git a/test/s1ap_proxy_test.erl b/test/s1ap_proxy_test.erl
index 28fe209..cb562a2 100644
--- a/test/s1ap_proxy_test.erl
+++ b/test/s1ap_proxy_test.erl
@@ -3,6 +3,7 @@
-include_lib("eunit/include/eunit.hrl").
-include("pfcp_mock.hrl").
+
%% ------------------------------------------------------------------
%% setup functions
%% ------------------------------------------------------------------
@@ -16,13 +17,14 @@
start() ->
pfcp_mock:mock_all(),
exometer_mock:mock_all(),
- s1ap_proxy:init().
+ {ok, Pid} = s1ap_proxy:start_link(),
+ #{handler => Pid}.
-stop(S) ->
+stop(#{handler := Pid}) ->
+ s1ap_proxy:shutdown(Pid),
exometer_mock:unmock_all(),
- pfcp_mock:unmock_all(),
- s1ap_proxy:deinit(S).
+ pfcp_mock:unmock_all().
%% ------------------------------------------------------------------
@@ -50,107 +52,90 @@
%% actual testcases
%% ------------------------------------------------------------------
-test_s1_setup_req(S0) ->
+test_s1_setup_req(#{handler := Pid}) ->
SetupReq = s1_setup_req_pdu(),
%% Expect the PDU to be proxied unmodified
- [?_assertEqual({{forward, SetupReq}, S0}, s1ap_proxy:process_pdu(SetupReq, S0))].
+ [?_assertEqual({forward, SetupReq}, s1ap_proxy:process_pdu(Pid, SetupReq))].
-test_e_rab_setup(S0) ->
+test_e_rab_setup(#{handler := Pid}) ->
%% [eNB <- MME] E-RAB SETUP REQUEST
SetupReqIn = e_rab_setup_req_pdu(?ADDR_U2C, ?TEID_U2C),
SetupReqExp = e_rab_setup_req_pdu(?ADDR_A2U, ?TEID_A2U),
- {{forward, SetupReqOut}, S1} = s1ap_proxy:process_pdu(SetupReqIn, S0),
-
%% [eNB -> MME] E-RAB SETUP RESPONSE
SetupRspIn = e_rab_setup_rsp_pdu(?ADDR_U2A, ?TEID_U2A),
SetupRspExp = e_rab_setup_rsp_pdu(?ADDR_C2U, ?TEID_C2U),
- {{forward, SetupRspOut}, _S2} = s1ap_proxy:process_pdu(SetupRspIn, S1),
- [?_assertEqual(SetupReqExp, SetupReqOut),
- ?_assertEqual(SetupRspExp, SetupRspOut)].
+ [?_assertEqual({forward, SetupReqExp}, s1ap_proxy:process_pdu(Pid, SetupReqIn)),
+ ?_assertEqual({forward, SetupRspExp}, s1ap_proxy:process_pdu(Pid, SetupRspIn))].
-test_e_rab_setup_req_fail(S0) ->
+test_e_rab_setup_req_fail(#{handler := Pid}) ->
%% pfcp_peer:session_establish_req/3 responds with a reject
PDU = pfcp_mock:pdu_rsp_reject(session_establishment_response, ?SEID_Loc),
pfcp_mock:mock_req(session_establish_req, PDU),
- %% the linked erab_fsm will terminate abnormally, so trap this
- process_flag(trap_exit, true),
%% eNB <- [S1GW <- MME] E-RAB SETUP REQUEST
%% eNB -- [S1GW -> MME] E-RAB SETUP RESPONSE (failure)
SetupReqIn = e_rab_setup_req_pdu(?ADDR_U2C, ?TEID_U2C),
SetupRspExp = e_rab_setup_rsp_fail_pdu(),
- {{reply, SetupRspOut}, _S1} = s1ap_proxy:process_pdu(SetupReqIn, S0),
%% TODO: make sure that the E-RAB FSM has been terminated
- [?_assertEqual(SetupRspExp, SetupRspOut)].
+ [?_assertEqual({reply, SetupRspExp}, s1ap_proxy:process_pdu(Pid, SetupReqIn))].
-test_e_rab_release_cmd(S0) ->
+test_e_rab_release_cmd(#{handler := Pid}) ->
%% [eNB <- MME] E-RAB SETUP REQUEST
- SetupReqIn = e_rab_setup_req_pdu(?ADDR_U2C, ?TEID_U2C),
- {_, S1} = s1ap_proxy:process_pdu(SetupReqIn, S0),
-
+ SetupReq = e_rab_setup_req_pdu(?ADDR_U2C, ?TEID_U2C),
%% [eNB -> MME] E-RAB SETUP RESPONSE
- SetupRspIn = e_rab_setup_rsp_pdu(?ADDR_U2A, ?TEID_U2A),
- {_, S2} = s1ap_proxy:process_pdu(SetupRspIn, S1),
-
+ SetupRsp = e_rab_setup_rsp_pdu(?ADDR_U2A, ?TEID_U2A),
%% [eNB <- MME] E-RAB RELEASE COMMAND
ReleaseCmd = e_rab_release_cmd_pdu(),
- {{forward, ReleaseCmdOut}, S3} = s1ap_proxy:process_pdu(ReleaseCmd, S2),
-
%% [eNB -> MME] E-RAB RELEASE RESPONSE
ReleaseRsp = e_rab_release_rsp_pdu(),
- {{forward, ReleaseRspOut}, _S4} = s1ap_proxy:process_pdu(ReleaseRsp, S3),
%% TODO: make sure that the E-RAB FSM has been terminated
- [?_assertEqual(ReleaseCmd, ReleaseCmdOut),
- ?_assertEqual(ReleaseRsp, ReleaseRspOut)].
+ [?_assertMatch({forward, _}, s1ap_proxy:process_pdu(Pid, SetupReq)),
+ ?_assertMatch({forward, _}, s1ap_proxy:process_pdu(Pid, SetupRsp)),
+ ?_assertEqual({forward, ReleaseCmd}, s1ap_proxy:process_pdu(Pid, ReleaseCmd)),
+ ?_assertEqual({forward, ReleaseRsp}, s1ap_proxy:process_pdu(Pid, ReleaseRsp))].
-test_e_rab_release_ind(S0) ->
+test_e_rab_release_ind(#{handler := Pid}) ->
%% [eNB <- MME] E-RAB SETUP REQUEST
- SetupReqIn = e_rab_setup_req_pdu(?ADDR_U2C, ?TEID_U2C),
- {_, S1} = s1ap_proxy:process_pdu(SetupReqIn, S0),
-
+ SetupReq = e_rab_setup_req_pdu(?ADDR_U2C, ?TEID_U2C),
%% [eNB -> MME] E-RAB SETUP RESPONSE
- SetupRspIn = e_rab_setup_rsp_pdu(?ADDR_U2A, ?TEID_U2A),
- {_, S2} = s1ap_proxy:process_pdu(SetupRspIn, S1),
-
+ SetupRsp = e_rab_setup_rsp_pdu(?ADDR_U2A, ?TEID_U2A),
%% [eNB -> MME] E-RAB RELEASE INDICATION
ReleaseInd = e_rab_release_ind_pdu(),
- {{forward, ReleaseIndOut}, _S3} = s1ap_proxy:process_pdu(ReleaseInd, S2),
%% TODO: make sure that the E-RAB FSM has been terminated
- [?_assertEqual(ReleaseInd, ReleaseIndOut)].
+ [?_assertMatch({forward, _}, s1ap_proxy:process_pdu(Pid, SetupReq)),
+ ?_assertMatch({forward, _}, s1ap_proxy:process_pdu(Pid, SetupRsp)),
+ ?_assertEqual({forward, ReleaseInd}, s1ap_proxy:process_pdu(Pid, ReleaseInd))].
-test_e_rab_modify_ind(S0) ->
+test_e_rab_modify_ind(#{handler := Pid}) ->
%% [eNB -> MME] E-RAB MODIFICATION INDICATION
ModifyIndIn = e_rab_modify_ind_pdu(?ADDR_U2A, ?TEID_U2A),
%% XXX: not implemented, we should actually expect ?ADDR_C2U, ?TEID_C2U
ModifyIndExp = e_rab_modify_ind_pdu(?ADDR_U2A, ?TEID_U2A),
- {{forward, ModifyIndOut}, _S1} = s1ap_proxy:process_pdu(ModifyIndIn, S0),
- [?_assertEqual(ModifyIndExp, ModifyIndOut)].
+ [?_assertEqual({forward, ModifyIndExp}, s1ap_proxy:process_pdu(Pid, ModifyIndIn))].
-test_initial_context_setup(S0) ->
+
+test_initial_context_setup(#{handler := Pid}) ->
%% [eNB <- MME] INITIAL CONTEXT SETUP REQUEST
InitCtxSetupReqIn = initial_context_setup_req_pdu(?ADDR_U2C, ?TEID_U2C),
InitCtxSetupReqExp = initial_context_setup_req_pdu(?ADDR_A2U, ?TEID_A2U),
- {{forward, InitCtxSetupReqOut}, S1} = s1ap_proxy:process_pdu(InitCtxSetupReqIn, S0),
-
%% [eNB -> MME] INITIAL CONTEXT SETUP RESPONSE
InitCtxSetupRspIn = initial_context_setup_rsp_pdu(?ADDR_U2A, ?TEID_U2A),
InitCtxSetupRspExp = initial_context_setup_rsp_pdu(?ADDR_C2U, ?TEID_C2U),
- {{forward, InitCtxSetupRspOut}, _S2} = s1ap_proxy:process_pdu(InitCtxSetupRspIn,
S1),
- [?_assertEqual(InitCtxSetupReqExp, InitCtxSetupReqOut),
- ?_assertEqual(InitCtxSetupRspExp, InitCtxSetupRspOut)].
+ [?_assertEqual({forward, InitCtxSetupReqExp}, s1ap_proxy:process_pdu(Pid,
InitCtxSetupReqIn)),
+ ?_assertEqual({forward, InitCtxSetupRspExp}, s1ap_proxy:process_pdu(Pid,
InitCtxSetupRspIn))].
%% ------------------------------------------------------------------
--
To view, visit
https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/38683?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: merged
Gerrit-Project: erlang/osmo-s1gw
Gerrit-Branch: master
Gerrit-Change-Id: Id6e4c7ee29ae31edca658e1293601d38e5f43e63
Gerrit-Change-Number: 38683
Gerrit-PatchSet: 3
Gerrit-Owner: fixeria <vyanitskiy(a)sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <laforge(a)osmocom.org>
Gerrit-Reviewer: pespin <pespin(a)sysmocom.de>