laforge has submitted this change. ( https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/37268?usp=email )
Change subject: pfcp_peer: PFCP peer implementation ......................................................................
pfcp_peer: PFCP peer implementation
Change-Id: I7bad0069dca2111006039d12e744a2bd6d2cb86e --- M config/sys.config M src/osmo_s1gw.app.src M src/osmo_s1gw_sup.erl A src/pfcp_peer.erl 4 files changed, 416 insertions(+), 3 deletions(-)
Approvals: laforge: Looks good to me, approved Jenkins Builder: Verified
diff --git a/config/sys.config b/config/sys.config index cf8ff34..99930cc 100644 --- a/config/sys.config +++ b/config/sys.config @@ -13,7 +13,9 @@ {osmo_s1gw, [{s1gw_bind_addr, "127.0.1.1"}, %% S1GW bind address for incoming eNB connections {mme_loc_addr, "127.0.2.1"}, %% local address for outgoing connections to the MME - {mme_rem_addr, "127.0.2.10"} %% remote address for outgoing connections to the MME + {mme_rem_addr, "127.0.2.10"}, %% remote address for outgoing connections to the MME + {upf_loc_addr, "127.0.1.1"}, %% local address for outgoing connections to the UPF + {upf_rem_addr, "127.0.1.2"} %% remote address for outgoing connections to the UPF ]}, %% ================================================================================ %% kernel config diff --git a/src/osmo_s1gw.app.src b/src/osmo_s1gw.app.src index 6fd8174..8bb89de 100644 --- a/src/osmo_s1gw.app.src +++ b/src/osmo_s1gw.app.src @@ -7,7 +7,8 @@ {applications, [ kernel, stdlib, - logger_color_formatter + logger_color_formatter, + pfcplib ]}, {modules, []}, {mod, {osmo_s1gw_app, []}}, diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl index e13cd1a..7bfd53f 100644 --- a/src/osmo_s1gw_sup.erl +++ b/src/osmo_s1gw_sup.erl @@ -47,6 +47,8 @@ -define(ENV_DEFAULT_MME_LOC_ADDR, "127.0.2.1"). -define(ENV_DEFAULT_MME_REM_ADDR, "127.0.2.10"). -define(ENV_DEFAULT_MME_REM_PORT, ?S1AP_PORT). +-define(ENV_DEFAULT_UPF_LOC_ADDR, "127.0.1.1"). +-define(ENV_DEFAULT_UPF_REM_ADDR, "127.0.1.2").
%% ------------------------------------------------------------------ %% supervisor API @@ -62,6 +64,9 @@ MmeLocAddr = get_env(mme_loc_addr, ?ENV_DEFAULT_MME_LOC_ADDR), MmeRemAddr = get_env(mme_rem_addr, ?ENV_DEFAULT_MME_REM_ADDR), MmeRemPort = get_env(mme_rem_port, ?ENV_DEFAULT_MME_REM_PORT), + UpfLocAddr = get_env(upf_loc_addr, ?ENV_DEFAULT_UPF_LOC_ADDR), + UpfRemAddr = get_env(upf_rem_addr, ?ENV_DEFAULT_UPF_REM_ADDR), + SctpServer = {sctp_server, {sctp_server, start_link, [S1GWBindAddr, S1GWBindPort, {{MmeLocAddr, MmeRemAddr}, MmeRemPort}]}, @@ -69,7 +74,14 @@ 5000, worker, [sctp_server]}, - {ok, {{one_for_all, 5, 10}, [SctpServer]}}. + PfcpPeer = {pfcp_peer, {pfcp_peer, start_link, + [UpfLocAddr, UpfRemAddr]}, + permanent, + 5000, + worker, + [pfcp_peer]}, + + {ok, {{one_for_one, 5, 10}, [SctpServer, PfcpPeer]}}.
%% ------------------------------------------------------------------ diff --git a/src/pfcp_peer.erl b/src/pfcp_peer.erl new file mode 100644 index 0000000..dc36c1f --- /dev/null +++ b/src/pfcp_peer.erl @@ -0,0 +1,398 @@ +%% Copyright (C) 2024 by sysmocom - s.f.m.c. GmbH info@sysmocom.de +%% Author: Vadim Yanitskiy vyanitskiy@sysmocom.de +%% +%% All Rights Reserved +%% +%% SPDX-License-Identifier: AGPL-3.0-or-later +%% +%% This program is free software; you can redistribute it and/or modify +%% it under the terms of the GNU Affero General Public License as +%% published by the Free Software Foundation; either version 3 of the +%% License, or (at your option) any later version. +%% +%% This program is distributed in the hope that it will be useful, +%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%% GNU General Public License for more details. +%% +%% You should have received a copy of the GNU Affero General Public License +%% along with this program. If not, see https://www.gnu.org/licenses/. +%% +%% Additional Permission under GNU AGPL version 3 section 7: +%% +%% If you modify this Program, or any covered work, by linking or +%% combining it with runtime libraries of Erlang/OTP as released by +%% Ericsson on https://www.erlang.org (or a modified version of these +%% libraries), containing parts covered by the terms of the Erlang Public +%% License (https://www.erlang.org/EPLICENSE), the licensors of this +%% Program grant you additional permission to convey the resulting work +%% without the need to license the runtime libraries of Erlang/OTP under +%% the GNU Affero General Public License. Corresponding Source for a +%% non-source form of such a combination shall include the source code +%% for the parts of the runtime libraries of Erlang/OTP used as well as +%% that of the covered work. + +-module(pfcp_peer). +-behaviour(gen_statem). + +-export([init/1, + callback_mode/0, + connecting/3, + connected/3, + code_change/4, + terminate/3]). +-export([start_link/2, + seid_alloc/0, + session_establish_req/3, + session_modify_req/3, + session_delete_req/1, + shutdown/0]). + +-include_lib("kernel/include/logger.hrl"). +-include_lib("pfcplib/include/pfcp_packet.hrl"). + +%% 3GPP TS 29.244, section 4.2 "UDP Header and Port Numbers" +-define(PFCP_PORT, 8805). + +-type pfcp_session_rsp() :: ok | {error, term()}. + +-type pfcp_msg_type() :: atom(). +-type pfcp_seq_nr() :: 0..16#ffffff. +-type pfcp_seid() :: 0..16#ffffffffffffffff. +-type pfcp_f_seid() :: #f_seid{}. +-type pfcp_ies() :: [term()] | map() | binary(). +-type pfcp_msg() :: {pfcp_msg_type(), pfcp_ies()}. +-type pfcp_pdu() :: #pfcp{}. + +-export_type([pfcp_session_rsp/0, + pfcp_msg_type/0, + pfcp_seq_nr/0, + pfcp_seid/0, + pfcp_ies/0, + pfcp_msg/0, + pfcp_pdu/0]). + +-record(peer_state, {seid :: pfcp_seid(), + sock :: gen_udp:socket(), + loc_addr :: inet:ip_address(), + rem_addr :: inet:ip_address(), + seq_nr :: pfcp_seq_nr(), + registry :: dict:dict() + }). + +-type peer_state() :: #peer_state{}. + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +start_link(LocAddr, RemAddr) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, + [LocAddr, RemAddr], + []). + + +%% Request to allocate a unique SEID +-spec seid_alloc() -> {ok, pfcp_seid()} | + {error, term()}. +seid_alloc() -> + gen_statem:call(?MODULE, ?FUNCTION_NAME). + + +-spec session_establish_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp(). +session_establish_req(SEID, PDRs, FARs) -> + gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}). + + +-spec session_modify_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp(). +session_modify_req(SEID, PDRs, FARs) -> + gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}). + + +-spec session_delete_req(pfcp_seid()) -> pfcp_session_rsp(). +session_delete_req(SEID) -> + gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID}). + + +shutdown() -> + gen_statem:stop(?MODULE). + + +%% ------------------------------------------------------------------ +%% gen_server API +%% ------------------------------------------------------------------ + +init([LocAddrStr, RemAddr]) when is_list(LocAddrStr) -> + {ok, LocAddr} = inet:parse_address(LocAddrStr), + init([LocAddr, RemAddr]); + +init([LocAddr, RemAddrStr]) when is_list(RemAddrStr) -> + {ok, RemAddr} = inet:parse_address(RemAddrStr), + init([LocAddr, RemAddr]); + +init([LocAddr, RemAddr]) -> + process_flag(trap_exit, true), + {ok, Sock} = gen_udp:open(?PFCP_PORT, [binary, + {ip, LocAddr}, + {reuseaddr, true}, + {active, true}]), + ?LOG_INFO("PFCP peer @ ~p will talk to UPF @ ~p", [LocAddr, RemAddr]), + {ok, connecting, #peer_state{seid = 0, + sock = Sock, + loc_addr = LocAddr, + rem_addr = RemAddr, + seq_nr = 0, + registry = dict:new()}}. + + +callback_mode() -> + [state_functions, state_enter]. + + +%% CONNECTING state +connecting(enter, OldState, + #peer_state{} = S0) -> + ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), + %% Tx PFCP Association Setup + {ok, S1} = send_assoc_setup(S0), + {next_state, connecting, S1, + [{state_timeout, 2_000, assoc_setup_timeout}]}; + +%% Handle Association Setup timeout +connecting(state_timeout, assoc_setup_timeout, _S) -> + {stop, {shutdown, assoc_setup_timeout}}; + +%% Handle incoming PFCP PDU(s) +connecting(info, {udp, Sock, _FromIp, _FromPort, Data}, + #peer_state{sock = Sock} = S) -> + case decode_pdu(Data, S) of + #pfcp{type = association_setup_response, + ie = #{pfcp_cause := 'Request accepted'}} -> + ?LOG_INFO("Rx Association Setup Response (Request accepted)"), + {next_state, connected, S}; + #pfcp{type = association_setup_response, + ie = #{pfcp_cause := Cause}} -> + ?LOG_ERROR("Rx Association Setup Response (~p)", [Cause]), + {stop, {shutdown, assoc_setup_nack}}; + PDU -> + ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]), + {keep_state, S} + end; + +connecting(Event, EventData, S) -> + handle_event(?FUNCTION_NAME, Event, EventData, S). + + +%% CONNECTED state +connected(enter, OldState, S) -> + ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), + {keep_state, S}; + +connected({call, From}, + {session_establish_req, SEID, PDRs, FARs}, + #peer_state{} = S0) -> + {Result, S1} = send_session_establish_req(SEID, PDRs, FARs, S0), + {keep_state, S1, [{reply, From, Result}]}; + +connected({call, From}, + {session_modify_req, SEID, PDRs, FARs}, + #peer_state{} = S0) -> + {Result, S1} = send_session_modify_req(SEID, PDRs, FARs, S0), + {keep_state, S1, [{reply, From, Result}]}; + +connected({call, From}, + {session_delete_req, SEID}, + #peer_state{} = S0) -> + {Result, S1} = send_session_delete_req(SEID, S0), + {keep_state, S1, [{reply, From, Result}]}; + +%% Handle incoming PFCP PDU(s) +connected(info, {udp, Sock, _FromIp, _FromPort, Data}, + #peer_state{sock = Sock} = S) -> + PDU = decode_pdu(Data, S), + route_pdu(PDU, S), + {keep_state, S}; + +%% Catch-all handler for this state +connected(Event, EventData, S) -> + handle_event(?FUNCTION_NAME, Event, EventData, S). + + +%% Event handler for all states +handle_event(_State, + {call, From}, seid_alloc, + #peer_state{seid = SEID} = S0) -> + {Pid, _Ref} = From, + S1 = registry_add(Pid, S0), + {keep_state, S1, + [{reply, From, {ok, SEID}}]}; + +handle_event(_State, + info, {'DOWN', Ref, process, Pid, _Reason}, + #peer_state{} = S0) -> + S1 = registry_del({Pid, Ref}, S0), + {keep_state, S1}; + +handle_event(State, Event, EventData, S) -> + ?LOG_ERROR("Unexpected event ~p in state ~p: ~p", [Event, State, EventData]), + {keep_state, S}. + + +code_change(_Vsn, State, S, _Extra) -> + {ok, State, S}. + + +terminate(Reason, State, S) -> + ?LOG_NOTICE("Terminating in state ~p, reason ~p", [State, Reason]), + case State of + connected -> + send_assoc_release(S); + _ -> + nop + end, + gen_udp:close(S#peer_state.sock), + ok. + + +%% ------------------------------------------------------------------ +%% private API +%% ------------------------------------------------------------------ + +-spec registry_add(pid(), peer_state()) -> peer_state(). +registry_add(Pid, #peer_state{registry = Reg, + seid = SEID} = S) -> + Ref = erlang:monitor(process, Pid), + NReg = dict:store(SEID, {Pid, Ref}, Reg), + ?LOG_DEBUG("Allocated SEID ~p to process ~p", [SEID, Pid]), + S#peer_state{registry = NReg, + seid = SEID + 1}. + + +-spec registry_del({pid(), reference()}, peer_state()) -> peer_state(). +registry_del({Pid, Ref}, #peer_state{registry = Reg} = S) -> + ?LOG_DEBUG("Unregister process ~p", [Pid]), + Fun = fun(_Key, Val) -> Val =/= {Pid, Ref} end, + NReg = dict:filter(Fun, Reg), + S#peer_state{registry = NReg}. + + +-spec decode_pdu(binary(), peer_state()) -> pfcp_pdu(). +decode_pdu(Data, _S) -> + ?LOG_DEBUG("Rx encoded PFCP PDU: ~p", [Data]), + PDU = pfcp_packet:decode(Data), + ?LOG_DEBUG("Rx PFCP PDU: ~p", [PDU]), + %% TODO: check PDU#pfcp.seq_no + %% TODO: there can be batched PDUs + PDU. + + +-spec route_pdu(pfcp_pdu(), peer_state()) -> term(). +route_pdu(#pfcp{seid = SEID} = PDU, + #peer_state{registry = Reg}) -> + case dict:find(SEID, Reg) of + {ok, {Pid, _Ref}} -> + ?LOG_DEBUG("PFCP PDU routed to ~p", [Pid]), + Pid ! PDU; %% XXX: we may crash here + error -> + ?LOG_NOTICE("PFCP PDU cannot be routed: ~p", [PDU]) + end. + + +-spec send_data(binary(), peer_state()) -> ok | {error, term()}. +send_data(Data, #peer_state{sock = Sock, + rem_addr = RemAddr}) -> + ?LOG_DEBUG("Tx encoded PFCP PDU: ~p", [Data]), + gen_udp:send(Sock, RemAddr, ?PFCP_PORT, Data). + + +-spec send_pdu(pfcp_msg(), peer_state()) -> {ok, peer_state()} | + {{error, term()}, peer_state()}. +send_pdu({MsgType, IEs}, S) -> + send_pdu(undefined, {MsgType, IEs}, S). + +-spec send_pdu(pfcp_seid() | undefined, + pfcp_msg(), peer_state()) -> {ok, peer_state()} | + {{error, term()}, peer_state()}. +send_pdu(SEID, {MsgType, IEs}, + #peer_state{seq_nr = SeqNr} = S) -> + PDU = #pfcp{version = v1, + type = MsgType, + seid = SEID, + seq_no = SeqNr, + ie = IEs}, + ?LOG_DEBUG("Tx PFCP PDU: ~p", [PDU]), + Data = pfcp_packet:encode(PDU), + case send_data(Data, S) of + ok -> + {ok, S#peer_state{seq_nr = SeqNr + 1}}; + {error, Error} -> + {{error, Error}, S} + end. + + +-spec get_node_id(peer_state()) -> binary(). +get_node_id(#peer_state{loc_addr = LocAddr}) -> + list_to_binary(tuple_to_list(LocAddr)). + + +-spec get_recovery_timestamp() -> pos_integer(). +get_recovery_timestamp() -> + {NowMS, NowS, _NowUS} = erlang:timestamp(), + %% erlang:timestamp() returns time relative to the UNIX epoch (1970), + %% but for PFCP we need time relative to the NTP era 0 (1900). + %% 2208988800 is the diff between 1900 and 1970. + NowMS * 1_000_000 + NowS + 2208988800. + + +-spec get_f_seid(pfcp_seid(), peer_state()) -> pfcp_f_seid(). +get_f_seid(SEID, S) -> + IE = #f_seid{seid = SEID, + ipv4 = undefined, + ipv6 = undefined}, + NodeId = get_node_id(S), + case byte_size(NodeId) of + 4 -> IE#f_seid{ipv4 = NodeId}; + 6 -> IE#f_seid{ipv6 = NodeId}; + _ -> IE %% shall not happen + end. + + +%% 6.2.6 PFCP Association Setup Procedure +%% 7.4.4.1 PFCP Association Setup Request +send_assoc_setup(S) -> + IEs = #{node_id => #node_id{id = get_node_id(S)}, + recovery_time_stamp => #recovery_time_stamp{time = get_recovery_timestamp()}}, + send_pdu({association_setup_request, IEs}, S). + + +%% 6.2.8 PFCP Association Release Procedure +%% 7.4.4.5 PFCP Association Release Request +send_assoc_release(S) -> + IEs = #{node_id => #node_id{id = get_node_id(S)}}, + send_pdu({association_release_request, IEs}, S). + + +%% 6.3.2 PFCP Session Establishment Procedure +%% 7.5.2 PFCP Session Establishment Request +send_session_establish_req(F_SEID, PDRs, FARs, S) -> + IEs = #{node_id => #node_id{id = get_node_id(S)}, + f_seid => get_f_seid(F_SEID, S), + create_pdr => PDRs, + create_far => FARs}, + send_pdu(16#00, {session_establishment_request, IEs}, S). + + +%% 6.3.3 PFCP Session Modification Procedure +%% 7.5.4 PFCP Session Modification Request +send_session_modify_req(SEID, PDRs, FARs, S) -> + IEs = #{update_pdr => PDRs, + update_far => FARs}, + send_pdu(SEID, {session_modification_request, IEs}, S). + + +%% 6.3.4 PFCP Session Deletion Procedure +%% 7.5.6 PFCP Session Deletion Request +send_session_delete_req(SEID, S) -> + send_pdu(SEID, {session_deletion_request, []}, S). + +%% vim:set ts=4 sw=4 et: