fixeria has submitted this change. ( https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/38533?usp=email )
Change subject: pfcp_peer: implement Heartbeat Request procedure ......................................................................
pfcp_peer: implement Heartbeat Request procedure
Change-Id: Ie52d58ffcc169abe8e72ea106ab957c12af2e8b9 --- M src/pfcp_peer.erl 1 file changed, 139 insertions(+), 17 deletions(-)
Approvals: pespin: Looks good to me, but someone else must approve laforge: Looks good to me, but someone else must approve Jenkins Builder: Verified fixeria: Looks good to me, approved
diff --git a/src/pfcp_peer.erl b/src/pfcp_peer.erl index cc98c49..9a8709c 100644 --- a/src/pfcp_peer.erl +++ b/src/pfcp_peer.erl @@ -43,6 +43,8 @@ terminate/3]). -export([start_link/2, seid_alloc/0, + heartbeat_req/0, + heartbeat_req/1, session_establish_req/3, session_modify_req/3, session_delete_req/1, @@ -54,8 +56,8 @@
%% 3GPP TS 29.244, section 4.2 "UDP Header and Port Numbers" -define(PFCP_PORT, 8805). - -define(PFCP_SEID_MAX, 16#ffffffffffffffff). +-define(PFCP_HEARTBEAT_REQ_TIMEOUT, 2000).
-type pfcp_session_rsp() :: ok | {error, term()}.
@@ -75,6 +77,11 @@ pfcp_msg/0, pfcp_pdu/0]).
+-record(heartbeat_state, {from :: undefined | gen_statem:from(), + seq_nr :: pfcp_seq_nr(), + pid :: pid() + }). + -record(peer_state, {seid :: pfcp_seid(), sock :: gen_udp:socket(), loc_addr :: inet:ip_address(), @@ -82,7 +89,8 @@ loc_rts :: pos_integer(), rem_rts :: undefined | pos_integer(), seq_nr :: pfcp_seq_nr(), - registry :: dict:dict() + registry :: dict:dict(), + heartbeat :: undefined | #heartbeat_state{} }).
-type peer_state() :: #peer_state{}. @@ -104,6 +112,18 @@ gen_statem:call(?MODULE, ?FUNCTION_NAME).
+-spec heartbeat_req() -> ok | {error, term()}. +heartbeat_req() -> + heartbeat_req(block). + +-spec heartbeat_req(block | noblock) -> ok | {error, term()}. +heartbeat_req(block) -> + gen_statem:call(?MODULE, ?FUNCTION_NAME); + +heartbeat_req(noblock) -> + gen_statem:cast(?MODULE, ?FUNCTION_NAME). + + -spec session_establish_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp(). session_establish_req(SEID, PDRs, FARs) -> gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}). @@ -174,8 +194,8 @@
%% Handle incoming PFCP PDU(s) connecting(info, {udp, Sock, FromIp, FromPort, Data}, - #peer_state{sock = Sock} = S) -> - PDU = decode_pdu(Data, {FromIp, FromPort}, S), + #peer_state{sock = Sock} = S0) -> + PDU = decode_pdu(Data, {FromIp, FromPort}, S0), case PDU of #pfcp{type = association_setup_response, ie = #{pfcp_cause := 'Request accepted', @@ -184,7 +204,7 @@ s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX), s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX_ACK), {next_state, connected, - S#peer_state{rem_rts = RRTS}}; + S0#peer_state{rem_rts = RRTS}}; #pfcp{type = association_setup_response, ie = #{pfcp_cause := Cause}} -> ?LOG_ERROR("Rx Association Setup Response (~p)", [Cause]), @@ -195,11 +215,14 @@ %% A CP function or UP function shall be prepared to receive a Heartbeat Request %% at any time (even from unknown peers) and it shall reply with a Heartbeat Response. #pfcp{type = heartbeat_request} -> - recv_heartbeat_request(PDU, {FromIp, FromPort}, S), - {keep_state, S}; + {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0), + {keep_state, S1}; + #pfcp{type = heartbeat_response} -> + {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0), + {keep_state, S1}; _ -> ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]), - {keep_state, S} + {keep_state, S0} end;
connecting(Event, EventData, S) -> @@ -232,18 +255,22 @@
%% Handle incoming PFCP PDU(s) connected(info, {udp, Sock, FromIp, FromPort, Data}, - #peer_state{sock = Sock} = S) -> - PDU = decode_pdu(Data, {FromIp, FromPort}, S), + #peer_state{sock = Sock} = S0) -> + PDU = decode_pdu(Data, {FromIp, FromPort}, S0), case PDU of - %% 3GPP TS 29.244, 6.2.2.2 Heartbeat Request #pfcp{type = heartbeat_request} -> - recv_heartbeat_request(PDU, {FromIp, FromPort}, S); + {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0), + {keep_state, S1}; + #pfcp{type = heartbeat_response} -> + {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0), + {keep_state, S1}; #pfcp{seid = SEID} when is_integer(SEID) -> - route_pdu(PDU, S); + route_pdu(PDU, S0), + {keep_state, S0}; _ -> - ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]) - end, - {keep_state, S}; + ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]), + {keep_state, S0} + end;
%% Catch-all handler for this state connected(Event, EventData, S) -> @@ -259,6 +286,45 @@ {keep_state, S1, [{reply, From, {ok, SEID}}]};
+%% Heartbeat Req (non-blocking) +handle_event(_State, + cast, heartbeat_req, + #peer_state{} = S0) -> + {_, S1} = send_heartbeat_request(undefined, S0), + {keep_state, S1}; + +%% Heartbeat Req (blocking) +handle_event(_State, + {call, From}, heartbeat_req, + #peer_state{} = S0) -> + case send_heartbeat_request(From, S0) of + {ok, S1} -> + %% postpone reply until we get the Resp + {keep_state, S1}; + {Error, S1} -> + {keep_state, S1, + [{reply, From, Error}]} + end; + +%% Heartbeat Req (timeout) +handle_event(_State, + cast, heartbeat_request_watchdog, + #peer_state{heartbeat = HB} = S) -> + case HB of + #heartbeat_state{from = From, + seq_nr = SeqNr} -> + ?LOG_NOTICE("Heartbeat Request (SeqNr=~p) timeout", [SeqNr]), + if + From =/= undefined -> + gen_statem:reply(From, {error, timeout}); + true -> + ok + end, + {keep_state, S#peer_state{heartbeat = undefined}}; + undefined -> + {keep_state, S} + end; + handle_event(_State, {call, From}, {session_establish_req, _SEID, _PDRs, _FARs}, #peer_state{} = S) -> @@ -444,6 +510,61 @@
%% 6.2.2 Heartbeat Procedure %% 7.4.2 Heartbeat Messages +send_heartbeat_request(_From, #peer_state{heartbeat = #heartbeat_state{seq_nr = SeqNr}} = S) -> + ?LOG_ERROR("Another Heartbeat Request is still pending (SeqNr=~p)", [SeqNr]), + {{error, heartbeat_req_pending}, S}; + +send_heartbeat_request(From, #peer_state{heartbeat = undefined, + seq_nr = SeqNr, + loc_rts = LRTS} = S0) -> + ReqIEs = #{recovery_time_stamp => #recovery_time_stamp{time = LRTS}}, + ?LOG_INFO("Tx Heartbeat Request (SeqNr=~p): ~p", [SeqNr, ReqIEs]), + case send_pdu({heartbeat_request, ReqIEs}, S0) of + {ok, S1} -> + Pid = spawn(fun heartbeat_request_watchdog/0), + HB = #heartbeat_state{from = From, + seq_nr = SeqNr, + pid = Pid}, + {ok, S1#peer_state{heartbeat = HB}}; + Result -> + Result + end. + + +heartbeat_request_watchdog() -> + receive + heartbeat_response -> + ok + after + ?PFCP_HEARTBEAT_REQ_TIMEOUT -> + gen_statem:cast(?MODULE, ?FUNCTION_NAME) + end. + + +recv_heartbeat_response(#pfcp{type = heartbeat_response, + seq_no = SeqNr, + ie = RspIEs}, + {_FromIp, _FromPort}, + #peer_state{heartbeat = HB} = S) -> + ?LOG_INFO("Rx Heartbeat Response (SeqNr=~p): ~p", [SeqNr, RspIEs]), + case HB of + #heartbeat_state{from = From, + seq_nr = SeqNr, + pid = Pid} -> + if + From =/= undefined -> + gen_statem:reply(From, ok); + true -> + ok + end, + Pid ! heartbeat_response, + {ok, S#peer_state{heartbeat = undefined}}; + _ -> + ?LOG_NOTICE("Heartbeat Response (SeqNr=~p) was not expected", [SeqNr]), + {{error, unexpected}, S} + end. + + recv_heartbeat_request(#pfcp{type = heartbeat_request, seq_no = SeqNr, ie = ReqIEs}, @@ -455,6 +576,7 @@ {Result, _} = send_pdu({heartbeat_response, RspIEs}, S#peer_state{seq_nr = SeqNr, rem_addr = FromIp}), - Result. + {Result, S}. +
%% vim:set ts=4 sw=4 et: