laforge submitted this change.
pfcp_peer: PFCP peer implementation
Change-Id: I7bad0069dca2111006039d12e744a2bd6d2cb86e
---
M config/sys.config
M src/osmo_s1gw.app.src
M src/osmo_s1gw_sup.erl
A src/pfcp_peer.erl
4 files changed, 416 insertions(+), 3 deletions(-)
diff --git a/config/sys.config b/config/sys.config
index cf8ff34..99930cc 100644
--- a/config/sys.config
+++ b/config/sys.config
@@ -13,7 +13,9 @@
{osmo_s1gw,
[{s1gw_bind_addr, "127.0.1.1"}, %% S1GW bind address for incoming eNB connections
{mme_loc_addr, "127.0.2.1"}, %% local address for outgoing connections to the MME
- {mme_rem_addr, "127.0.2.10"} %% remote address for outgoing connections to the MME
+ {mme_rem_addr, "127.0.2.10"}, %% remote address for outgoing connections to the MME
+ {upf_loc_addr, "127.0.1.1"}, %% local address for outgoing connections to the UPF
+ {upf_rem_addr, "127.0.1.2"} %% remote address for outgoing connections to the UPF
]},
%% ================================================================================
%% kernel config
diff --git a/src/osmo_s1gw.app.src b/src/osmo_s1gw.app.src
index 6fd8174..8bb89de 100644
--- a/src/osmo_s1gw.app.src
+++ b/src/osmo_s1gw.app.src
@@ -7,7 +7,8 @@
{applications, [
kernel,
stdlib,
- logger_color_formatter
+ logger_color_formatter,
+ pfcplib
]},
{modules, []},
{mod, {osmo_s1gw_app, []}},
diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl
index e13cd1a..7bfd53f 100644
--- a/src/osmo_s1gw_sup.erl
+++ b/src/osmo_s1gw_sup.erl
@@ -47,6 +47,8 @@
-define(ENV_DEFAULT_MME_LOC_ADDR, "127.0.2.1").
-define(ENV_DEFAULT_MME_REM_ADDR, "127.0.2.10").
-define(ENV_DEFAULT_MME_REM_PORT, ?S1AP_PORT).
+-define(ENV_DEFAULT_UPF_LOC_ADDR, "127.0.1.1").
+-define(ENV_DEFAULT_UPF_REM_ADDR, "127.0.1.2").
%% ------------------------------------------------------------------
%% supervisor API
@@ -62,6 +64,9 @@
MmeLocAddr = get_env(mme_loc_addr, ?ENV_DEFAULT_MME_LOC_ADDR),
MmeRemAddr = get_env(mme_rem_addr, ?ENV_DEFAULT_MME_REM_ADDR),
MmeRemPort = get_env(mme_rem_port, ?ENV_DEFAULT_MME_REM_PORT),
+ UpfLocAddr = get_env(upf_loc_addr, ?ENV_DEFAULT_UPF_LOC_ADDR),
+ UpfRemAddr = get_env(upf_rem_addr, ?ENV_DEFAULT_UPF_REM_ADDR),
+
SctpServer = {sctp_server, {sctp_server, start_link,
[S1GWBindAddr, S1GWBindPort,
{{MmeLocAddr, MmeRemAddr}, MmeRemPort}]},
@@ -69,7 +74,14 @@
5000,
worker,
[sctp_server]},
- {ok, {{one_for_all, 5, 10}, [SctpServer]}}.
+ PfcpPeer = {pfcp_peer, {pfcp_peer, start_link,
+ [UpfLocAddr, UpfRemAddr]},
+ permanent,
+ 5000,
+ worker,
+ [pfcp_peer]},
+
+ {ok, {{one_for_one, 5, 10}, [SctpServer, PfcpPeer]}}.
%% ------------------------------------------------------------------
diff --git a/src/pfcp_peer.erl b/src/pfcp_peer.erl
new file mode 100644
index 0000000..dc36c1f
--- /dev/null
+++ b/src/pfcp_peer.erl
@@ -0,0 +1,398 @@
+%% Copyright (C) 2024 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+%% Author: Vadim Yanitskiy <vyanitskiy@sysmocom.de>
+%%
+%% All Rights Reserved
+%%
+%% SPDX-License-Identifier: AGPL-3.0-or-later
+%%
+%% This program is free software; you can redistribute it and/or modify
+%% it under the terms of the GNU Affero General Public License as
+%% published by the Free Software Foundation; either version 3 of the
+%% License, or (at your option) any later version.
+%%
+%% This program is distributed in the hope that it will be useful,
+%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%% GNU General Public License for more details.
+%%
+%% You should have received a copy of the GNU Affero General Public License
+%% along with this program. If not, see <https://www.gnu.org/licenses/>.
+%%
+%% Additional Permission under GNU AGPL version 3 section 7:
+%%
+%% If you modify this Program, or any covered work, by linking or
+%% combining it with runtime libraries of Erlang/OTP as released by
+%% Ericsson on https://www.erlang.org (or a modified version of these
+%% libraries), containing parts covered by the terms of the Erlang Public
+%% License (https://www.erlang.org/EPLICENSE), the licensors of this
+%% Program grant you additional permission to convey the resulting work
+%% without the need to license the runtime libraries of Erlang/OTP under
+%% the GNU Affero General Public License. Corresponding Source for a
+%% non-source form of such a combination shall include the source code
+%% for the parts of the runtime libraries of Erlang/OTP used as well as
+%% that of the covered work.
+
+-module(pfcp_peer).
+-behaviour(gen_statem).
+
+-export([init/1,
+ callback_mode/0,
+ connecting/3,
+ connected/3,
+ code_change/4,
+ terminate/3]).
+-export([start_link/2,
+ seid_alloc/0,
+ session_establish_req/3,
+ session_modify_req/3,
+ session_delete_req/1,
+ shutdown/0]).
+
+-include_lib("kernel/include/logger.hrl").
+-include_lib("pfcplib/include/pfcp_packet.hrl").
+
+%% 3GPP TS 29.244, section 4.2 "UDP Header and Port Numbers"
+-define(PFCP_PORT, 8805).
+
+-type pfcp_session_rsp() :: ok | {error, term()}.
+
+-type pfcp_msg_type() :: atom().
+-type pfcp_seq_nr() :: 0..16#ffffff.
+-type pfcp_seid() :: 0..16#ffffffffffffffff.
+-type pfcp_f_seid() :: #f_seid{}.
+-type pfcp_ies() :: [term()] | map() | binary().
+-type pfcp_msg() :: {pfcp_msg_type(), pfcp_ies()}.
+-type pfcp_pdu() :: #pfcp{}.
+
+-export_type([pfcp_session_rsp/0,
+ pfcp_msg_type/0,
+ pfcp_seq_nr/0,
+ pfcp_seid/0,
+ pfcp_ies/0,
+ pfcp_msg/0,
+ pfcp_pdu/0]).
+
+-record(peer_state, {seid :: pfcp_seid(),
+ sock :: gen_udp:socket(),
+ loc_addr :: inet:ip_address(),
+ rem_addr :: inet:ip_address(),
+ seq_nr :: pfcp_seq_nr(),
+ registry :: dict:dict()
+ }).
+
+-type peer_state() :: #peer_state{}.
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+start_link(LocAddr, RemAddr) ->
+ gen_statem:start_link({local, ?MODULE}, ?MODULE,
+ [LocAddr, RemAddr],
+ []).
+
+
+%% Request to allocate a unique SEID
+-spec seid_alloc() -> {ok, pfcp_seid()} |
+ {error, term()}.
+seid_alloc() ->
+ gen_statem:call(?MODULE, ?FUNCTION_NAME).
+
+
+-spec session_establish_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp().
+session_establish_req(SEID, PDRs, FARs) ->
+ gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}).
+
+
+-spec session_modify_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp().
+session_modify_req(SEID, PDRs, FARs) ->
+ gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}).
+
+
+-spec session_delete_req(pfcp_seid()) -> pfcp_session_rsp().
+session_delete_req(SEID) ->
+ gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID}).
+
+
+shutdown() ->
+ gen_statem:stop(?MODULE).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([LocAddrStr, RemAddr]) when is_list(LocAddrStr) ->
+ {ok, LocAddr} = inet:parse_address(LocAddrStr),
+ init([LocAddr, RemAddr]);
+
+init([LocAddr, RemAddrStr]) when is_list(RemAddrStr) ->
+ {ok, RemAddr} = inet:parse_address(RemAddrStr),
+ init([LocAddr, RemAddr]);
+
+init([LocAddr, RemAddr]) ->
+ process_flag(trap_exit, true),
+ {ok, Sock} = gen_udp:open(?PFCP_PORT, [binary,
+ {ip, LocAddr},
+ {reuseaddr, true},
+ {active, true}]),
+ ?LOG_INFO("PFCP peer @ ~p will talk to UPF @ ~p", [LocAddr, RemAddr]),
+ {ok, connecting, #peer_state{seid = 0,
+ sock = Sock,
+ loc_addr = LocAddr,
+ rem_addr = RemAddr,
+ seq_nr = 0,
+ registry = dict:new()}}.
+
+
+callback_mode() ->
+ [state_functions, state_enter].
+
+
+%% CONNECTING state
+connecting(enter, OldState,
+ #peer_state{} = S0) ->
+ ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]),
+ %% Tx PFCP Association Setup
+ {ok, S1} = send_assoc_setup(S0),
+ {next_state, connecting, S1,
+ [{state_timeout, 2_000, assoc_setup_timeout}]};
+
+%% Handle Association Setup timeout
+connecting(state_timeout, assoc_setup_timeout, _S) ->
+ {stop, {shutdown, assoc_setup_timeout}};
+
+%% Handle incoming PFCP PDU(s)
+connecting(info, {udp, Sock, _FromIp, _FromPort, Data},
+ #peer_state{sock = Sock} = S) ->
+ case decode_pdu(Data, S) of
+ #pfcp{type = association_setup_response,
+ ie = #{pfcp_cause := 'Request accepted'}} ->
+ ?LOG_INFO("Rx Association Setup Response (Request accepted)"),
+ {next_state, connected, S};
+ #pfcp{type = association_setup_response,
+ ie = #{pfcp_cause := Cause}} ->
+ ?LOG_ERROR("Rx Association Setup Response (~p)", [Cause]),
+ {stop, {shutdown, assoc_setup_nack}};
+ PDU ->
+ ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]),
+ {keep_state, S}
+ end;
+
+connecting(Event, EventData, S) ->
+ handle_event(?FUNCTION_NAME, Event, EventData, S).
+
+
+%% CONNECTED state
+connected(enter, OldState, S) ->
+ ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]),
+ {keep_state, S};
+
+connected({call, From},
+ {session_establish_req, SEID, PDRs, FARs},
+ #peer_state{} = S0) ->
+ {Result, S1} = send_session_establish_req(SEID, PDRs, FARs, S0),
+ {keep_state, S1, [{reply, From, Result}]};
+
+connected({call, From},
+ {session_modify_req, SEID, PDRs, FARs},
+ #peer_state{} = S0) ->
+ {Result, S1} = send_session_modify_req(SEID, PDRs, FARs, S0),
+ {keep_state, S1, [{reply, From, Result}]};
+
+connected({call, From},
+ {session_delete_req, SEID},
+ #peer_state{} = S0) ->
+ {Result, S1} = send_session_delete_req(SEID, S0),
+ {keep_state, S1, [{reply, From, Result}]};
+
+%% Handle incoming PFCP PDU(s)
+connected(info, {udp, Sock, _FromIp, _FromPort, Data},
+ #peer_state{sock = Sock} = S) ->
+ PDU = decode_pdu(Data, S),
+ route_pdu(PDU, S),
+ {keep_state, S};
+
+%% Catch-all handler for this state
+connected(Event, EventData, S) ->
+ handle_event(?FUNCTION_NAME, Event, EventData, S).
+
+
+%% Event handler for all states
+handle_event(_State,
+ {call, From}, seid_alloc,
+ #peer_state{seid = SEID} = S0) ->
+ {Pid, _Ref} = From,
+ S1 = registry_add(Pid, S0),
+ {keep_state, S1,
+ [{reply, From, {ok, SEID}}]};
+
+handle_event(_State,
+ info, {'DOWN', Ref, process, Pid, _Reason},
+ #peer_state{} = S0) ->
+ S1 = registry_del({Pid, Ref}, S0),
+ {keep_state, S1};
+
+handle_event(State, Event, EventData, S) ->
+ ?LOG_ERROR("Unexpected event ~p in state ~p: ~p", [Event, State, EventData]),
+ {keep_state, S}.
+
+
+code_change(_Vsn, State, S, _Extra) ->
+ {ok, State, S}.
+
+
+terminate(Reason, State, S) ->
+ ?LOG_NOTICE("Terminating in state ~p, reason ~p", [State, Reason]),
+ case State of
+ connected ->
+ send_assoc_release(S);
+ _ ->
+ nop
+ end,
+ gen_udp:close(S#peer_state.sock),
+ ok.
+
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+-spec registry_add(pid(), peer_state()) -> peer_state().
+registry_add(Pid, #peer_state{registry = Reg,
+ seid = SEID} = S) ->
+ Ref = erlang:monitor(process, Pid),
+ NReg = dict:store(SEID, {Pid, Ref}, Reg),
+ ?LOG_DEBUG("Allocated SEID ~p to process ~p", [SEID, Pid]),
+ S#peer_state{registry = NReg,
+ seid = SEID + 1}.
+
+
+-spec registry_del({pid(), reference()}, peer_state()) -> peer_state().
+registry_del({Pid, Ref}, #peer_state{registry = Reg} = S) ->
+ ?LOG_DEBUG("Unregister process ~p", [Pid]),
+ Fun = fun(_Key, Val) -> Val =/= {Pid, Ref} end,
+ NReg = dict:filter(Fun, Reg),
+ S#peer_state{registry = NReg}.
+
+
+-spec decode_pdu(binary(), peer_state()) -> pfcp_pdu().
+decode_pdu(Data, _S) ->
+ ?LOG_DEBUG("Rx encoded PFCP PDU: ~p", [Data]),
+ PDU = pfcp_packet:decode(Data),
+ ?LOG_DEBUG("Rx PFCP PDU: ~p", [PDU]),
+ %% TODO: check PDU#pfcp.seq_no
+ %% TODO: there can be batched PDUs
+ PDU.
+
+
+-spec route_pdu(pfcp_pdu(), peer_state()) -> term().
+route_pdu(#pfcp{seid = SEID} = PDU,
+ #peer_state{registry = Reg}) ->
+ case dict:find(SEID, Reg) of
+ {ok, {Pid, _Ref}} ->
+ ?LOG_DEBUG("PFCP PDU routed to ~p", [Pid]),
+ Pid ! PDU; %% XXX: we may crash here
+ error ->
+ ?LOG_NOTICE("PFCP PDU cannot be routed: ~p", [PDU])
+ end.
+
+
+-spec send_data(binary(), peer_state()) -> ok | {error, term()}.
+send_data(Data, #peer_state{sock = Sock,
+ rem_addr = RemAddr}) ->
+ ?LOG_DEBUG("Tx encoded PFCP PDU: ~p", [Data]),
+ gen_udp:send(Sock, RemAddr, ?PFCP_PORT, Data).
+
+
+-spec send_pdu(pfcp_msg(), peer_state()) -> {ok, peer_state()} |
+ {{error, term()}, peer_state()}.
+send_pdu({MsgType, IEs}, S) ->
+ send_pdu(undefined, {MsgType, IEs}, S).
+
+-spec send_pdu(pfcp_seid() | undefined,
+ pfcp_msg(), peer_state()) -> {ok, peer_state()} |
+ {{error, term()}, peer_state()}.
+send_pdu(SEID, {MsgType, IEs},
+ #peer_state{seq_nr = SeqNr} = S) ->
+ PDU = #pfcp{version = v1,
+ type = MsgType,
+ seid = SEID,
+ seq_no = SeqNr,
+ ie = IEs},
+ ?LOG_DEBUG("Tx PFCP PDU: ~p", [PDU]),
+ Data = pfcp_packet:encode(PDU),
+ case send_data(Data, S) of
+ ok ->
+ {ok, S#peer_state{seq_nr = SeqNr + 1}};
+ {error, Error} ->
+ {{error, Error}, S}
+ end.
+
+
+-spec get_node_id(peer_state()) -> binary().
+get_node_id(#peer_state{loc_addr = LocAddr}) ->
+ list_to_binary(tuple_to_list(LocAddr)).
+
+
+-spec get_recovery_timestamp() -> pos_integer().
+get_recovery_timestamp() ->
+ {NowMS, NowS, _NowUS} = erlang:timestamp(),
+ %% erlang:timestamp() returns time relative to the UNIX epoch (1970),
+ %% but for PFCP we need time relative to the NTP era 0 (1900).
+ %% 2208988800 is the diff between 1900 and 1970.
+ NowMS * 1_000_000 + NowS + 2208988800.
+
+
+-spec get_f_seid(pfcp_seid(), peer_state()) -> pfcp_f_seid().
+get_f_seid(SEID, S) ->
+ IE = #f_seid{seid = SEID,
+ ipv4 = undefined,
+ ipv6 = undefined},
+ NodeId = get_node_id(S),
+ case byte_size(NodeId) of
+ 4 -> IE#f_seid{ipv4 = NodeId};
+ 6 -> IE#f_seid{ipv6 = NodeId};
+ _ -> IE %% shall not happen
+ end.
+
+
+%% 6.2.6 PFCP Association Setup Procedure
+%% 7.4.4.1 PFCP Association Setup Request
+send_assoc_setup(S) ->
+ IEs = #{node_id => #node_id{id = get_node_id(S)},
+ recovery_time_stamp => #recovery_time_stamp{time = get_recovery_timestamp()}},
+ send_pdu({association_setup_request, IEs}, S).
+
+
+%% 6.2.8 PFCP Association Release Procedure
+%% 7.4.4.5 PFCP Association Release Request
+send_assoc_release(S) ->
+ IEs = #{node_id => #node_id{id = get_node_id(S)}},
+ send_pdu({association_release_request, IEs}, S).
+
+
+%% 6.3.2 PFCP Session Establishment Procedure
+%% 7.5.2 PFCP Session Establishment Request
+send_session_establish_req(F_SEID, PDRs, FARs, S) ->
+ IEs = #{node_id => #node_id{id = get_node_id(S)},
+ f_seid => get_f_seid(F_SEID, S),
+ create_pdr => PDRs,
+ create_far => FARs},
+ send_pdu(16#00, {session_establishment_request, IEs}, S).
+
+
+%% 6.3.3 PFCP Session Modification Procedure
+%% 7.5.4 PFCP Session Modification Request
+send_session_modify_req(SEID, PDRs, FARs, S) ->
+ IEs = #{update_pdr => PDRs,
+ update_far => FARs},
+ send_pdu(SEID, {session_modification_request, IEs}, S).
+
+
+%% 6.3.4 PFCP Session Deletion Procedure
+%% 7.5.6 PFCP Session Deletion Request
+send_session_delete_req(SEID, S) ->
+ send_pdu(SEID, {session_deletion_request, []}, S).
+
+%% vim:set ts=4 sw=4 et:
To view, visit change 37268. To unsubscribe, or for help writing mail filters, visit settings.