fixeria submitted this change.
pfcp_peer: implement Heartbeat Request procedure
Change-Id: Ie52d58ffcc169abe8e72ea106ab957c12af2e8b9
---
M src/pfcp_peer.erl
1 file changed, 139 insertions(+), 17 deletions(-)
diff --git a/src/pfcp_peer.erl b/src/pfcp_peer.erl
index cc98c49..9a8709c 100644
--- a/src/pfcp_peer.erl
+++ b/src/pfcp_peer.erl
@@ -43,6 +43,8 @@
terminate/3]).
-export([start_link/2,
seid_alloc/0,
+ heartbeat_req/0,
+ heartbeat_req/1,
session_establish_req/3,
session_modify_req/3,
session_delete_req/1,
@@ -54,8 +56,8 @@
%% 3GPP TS 29.244, section 4.2 "UDP Header and Port Numbers"
-define(PFCP_PORT, 8805).
-
-define(PFCP_SEID_MAX, 16#ffffffffffffffff).
+-define(PFCP_HEARTBEAT_REQ_TIMEOUT, 2000).
-type pfcp_session_rsp() :: ok | {error, term()}.
@@ -75,6 +77,11 @@
pfcp_msg/0,
pfcp_pdu/0]).
+-record(heartbeat_state, {from :: undefined | gen_statem:from(),
+ seq_nr :: pfcp_seq_nr(),
+ pid :: pid()
+ }).
+
-record(peer_state, {seid :: pfcp_seid(),
sock :: gen_udp:socket(),
loc_addr :: inet:ip_address(),
@@ -82,7 +89,8 @@
loc_rts :: pos_integer(),
rem_rts :: undefined | pos_integer(),
seq_nr :: pfcp_seq_nr(),
- registry :: dict:dict()
+ registry :: dict:dict(),
+ heartbeat :: undefined | #heartbeat_state{}
}).
-type peer_state() :: #peer_state{}.
@@ -104,6 +112,18 @@
gen_statem:call(?MODULE, ?FUNCTION_NAME).
+-spec heartbeat_req() -> ok | {error, term()}.
+heartbeat_req() ->
+ heartbeat_req(block).
+
+-spec heartbeat_req(block | noblock) -> ok | {error, term()}.
+heartbeat_req(block) ->
+ gen_statem:call(?MODULE, ?FUNCTION_NAME);
+
+heartbeat_req(noblock) ->
+ gen_statem:cast(?MODULE, ?FUNCTION_NAME).
+
+
-spec session_establish_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp().
session_establish_req(SEID, PDRs, FARs) ->
gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}).
@@ -174,8 +194,8 @@
%% Handle incoming PFCP PDU(s)
connecting(info, {udp, Sock, FromIp, FromPort, Data},
- #peer_state{sock = Sock} = S) ->
- PDU = decode_pdu(Data, {FromIp, FromPort}, S),
+ #peer_state{sock = Sock} = S0) ->
+ PDU = decode_pdu(Data, {FromIp, FromPort}, S0),
case PDU of
#pfcp{type = association_setup_response,
ie = #{pfcp_cause := 'Request accepted',
@@ -184,7 +204,7 @@
s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX),
s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX_ACK),
{next_state, connected,
- S#peer_state{rem_rts = RRTS}};
+ S0#peer_state{rem_rts = RRTS}};
#pfcp{type = association_setup_response,
ie = #{pfcp_cause := Cause}} ->
?LOG_ERROR("Rx Association Setup Response (~p)", [Cause]),
@@ -195,11 +215,14 @@
%% A CP function or UP function shall be prepared to receive a Heartbeat Request
%% at any time (even from unknown peers) and it shall reply with a Heartbeat Response.
#pfcp{type = heartbeat_request} ->
- recv_heartbeat_request(PDU, {FromIp, FromPort}, S),
- {keep_state, S};
+ {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0),
+ {keep_state, S1};
+ #pfcp{type = heartbeat_response} ->
+ {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0),
+ {keep_state, S1};
_ ->
?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]),
- {keep_state, S}
+ {keep_state, S0}
end;
connecting(Event, EventData, S) ->
@@ -232,18 +255,22 @@
%% Handle incoming PFCP PDU(s)
connected(info, {udp, Sock, FromIp, FromPort, Data},
- #peer_state{sock = Sock} = S) ->
- PDU = decode_pdu(Data, {FromIp, FromPort}, S),
+ #peer_state{sock = Sock} = S0) ->
+ PDU = decode_pdu(Data, {FromIp, FromPort}, S0),
case PDU of
- %% 3GPP TS 29.244, 6.2.2.2 Heartbeat Request
#pfcp{type = heartbeat_request} ->
- recv_heartbeat_request(PDU, {FromIp, FromPort}, S);
+ {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0),
+ {keep_state, S1};
+ #pfcp{type = heartbeat_response} ->
+ {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0),
+ {keep_state, S1};
#pfcp{seid = SEID} when is_integer(SEID) ->
- route_pdu(PDU, S);
+ route_pdu(PDU, S0),
+ {keep_state, S0};
_ ->
- ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU])
- end,
- {keep_state, S};
+ ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]),
+ {keep_state, S0}
+ end;
%% Catch-all handler for this state
connected(Event, EventData, S) ->
@@ -259,6 +286,45 @@
{keep_state, S1,
[{reply, From, {ok, SEID}}]};
+%% Heartbeat Req (non-blocking)
+handle_event(_State,
+ cast, heartbeat_req,
+ #peer_state{} = S0) ->
+ {_, S1} = send_heartbeat_request(undefined, S0),
+ {keep_state, S1};
+
+%% Heartbeat Req (blocking)
+handle_event(_State,
+ {call, From}, heartbeat_req,
+ #peer_state{} = S0) ->
+ case send_heartbeat_request(From, S0) of
+ {ok, S1} ->
+ %% postpone reply until we get the Resp
+ {keep_state, S1};
+ {Error, S1} ->
+ {keep_state, S1,
+ [{reply, From, Error}]}
+ end;
+
+%% Heartbeat Req (timeout)
+handle_event(_State,
+ cast, heartbeat_request_watchdog,
+ #peer_state{heartbeat = HB} = S) ->
+ case HB of
+ #heartbeat_state{from = From,
+ seq_nr = SeqNr} ->
+ ?LOG_NOTICE("Heartbeat Request (SeqNr=~p) timeout", [SeqNr]),
+ if
+ From =/= undefined ->
+ gen_statem:reply(From, {error, timeout});
+ true ->
+ ok
+ end,
+ {keep_state, S#peer_state{heartbeat = undefined}};
+ undefined ->
+ {keep_state, S}
+ end;
+
handle_event(_State,
{call, From}, {session_establish_req, _SEID, _PDRs, _FARs},
#peer_state{} = S) ->
@@ -444,6 +510,61 @@
%% 6.2.2 Heartbeat Procedure
%% 7.4.2 Heartbeat Messages
+send_heartbeat_request(_From, #peer_state{heartbeat = #heartbeat_state{seq_nr = SeqNr}} = S) ->
+ ?LOG_ERROR("Another Heartbeat Request is still pending (SeqNr=~p)", [SeqNr]),
+ {{error, heartbeat_req_pending}, S};
+
+send_heartbeat_request(From, #peer_state{heartbeat = undefined,
+ seq_nr = SeqNr,
+ loc_rts = LRTS} = S0) ->
+ ReqIEs = #{recovery_time_stamp => #recovery_time_stamp{time = LRTS}},
+ ?LOG_INFO("Tx Heartbeat Request (SeqNr=~p): ~p", [SeqNr, ReqIEs]),
+ case send_pdu({heartbeat_request, ReqIEs}, S0) of
+ {ok, S1} ->
+ Pid = spawn(fun heartbeat_request_watchdog/0),
+ HB = #heartbeat_state{from = From,
+ seq_nr = SeqNr,
+ pid = Pid},
+ {ok, S1#peer_state{heartbeat = HB}};
+ Result ->
+ Result
+ end.
+
+
+heartbeat_request_watchdog() ->
+ receive
+ heartbeat_response ->
+ ok
+ after
+ ?PFCP_HEARTBEAT_REQ_TIMEOUT ->
+ gen_statem:cast(?MODULE, ?FUNCTION_NAME)
+ end.
+
+
+recv_heartbeat_response(#pfcp{type = heartbeat_response,
+ seq_no = SeqNr,
+ ie = RspIEs},
+ {_FromIp, _FromPort},
+ #peer_state{heartbeat = HB} = S) ->
+ ?LOG_INFO("Rx Heartbeat Response (SeqNr=~p): ~p", [SeqNr, RspIEs]),
+ case HB of
+ #heartbeat_state{from = From,
+ seq_nr = SeqNr,
+ pid = Pid} ->
+ if
+ From =/= undefined ->
+ gen_statem:reply(From, ok);
+ true ->
+ ok
+ end,
+ Pid ! heartbeat_response,
+ {ok, S#peer_state{heartbeat = undefined}};
+ _ ->
+ ?LOG_NOTICE("Heartbeat Response (SeqNr=~p) was not expected", [SeqNr]),
+ {{error, unexpected}, S}
+ end.
+
+
recv_heartbeat_request(#pfcp{type = heartbeat_request,
seq_no = SeqNr,
ie = ReqIEs},
@@ -455,6 +576,7 @@
{Result, _} = send_pdu({heartbeat_response, RspIEs},
S#peer_state{seq_nr = SeqNr,
rem_addr = FromIp}),
- Result.
+ {Result, S}.
+
%% vim:set ts=4 sw=4 et:
To view, visit change 38533. To unsubscribe, or for help writing mail filters, visit settings.