fixeria has submitted this change. ( https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/40281?usp=email )
Change subject: enft_kpi: retrieve per-eNB traffic counters ......................................................................
enft_kpi: retrieve per-eNB traffic counters
Change-Id: I498d2854447a2d53d2abddd38652f3e2bbb1fbdd Related: SYS#7307 --- M config/sys.config M debian/control M include/s1gw_metrics.hrl M rebar.config M rebar.lock A src/enft_kpi.erl M src/osmo_s1gw.app.src M src/osmo_s1gw_sup.erl M src/s1ap_proxy.erl M test/s1ap_proxy_test.erl 10 files changed, 684 insertions(+), 1 deletion(-)
Approvals: Jenkins Builder: Verified pespin: Looks good to me, but someone else must approve fixeria: Looks good to me, approved
diff --git a/config/sys.config b/config/sys.config index d405db8..57cb61c 100644 --- a/config/sys.config +++ b/config/sys.config @@ -19,6 +19,11 @@ %% Optional PFCP Network Instance IEs (omitted if not configured) %% {pfcp_net_inst_core, << 16#09, "core-side" >>}, %% PFCP Network Instance IE value (to core) %% {pfcp_net_inst_access, << 16#0a, "radio-side" >>} %% PFCP Network Instance IE value (to access) +%% +%% Optional NFT KPI configuration +%% {enft_kpi_enable, true}, %% whether to enable the NFT KPI module (default: false) +%% {enft_kpi_table_name, "osmo-s1gw"}, %% the NFT table name to be used by this process +%% {enft_kpi_interval, 3000} %% counter reporting interval (ms) ]}, %% ================================================================================ %% kernel config diff --git a/debian/control b/debian/control index a4279b7..10eddac 100644 --- a/debian/control +++ b/debian/control @@ -3,6 +3,7 @@ Section: net Priority: optional Build-Depends: erlang-nox, + libnftables-dev, rebar3, debhelper (>= 10) Standards-Version: 4.5.0 diff --git a/include/s1gw_metrics.hrl b/include/s1gw_metrics.hrl index 940529b..83ab46d 100644 --- a/include/s1gw_metrics.hrl +++ b/include/s1gw_metrics.hrl @@ -36,6 +36,13 @@ -define(S1GW_CTR_S1AP_PROXY_OUT_PKT_REPLY_ALL, [ctr, s1ap, proxy, out_pkt, reply, all]). -define(S1GW_CTR_S1AP_PROXY_OUT_PKT_REPLY_ERAB_SETUP_RSP, [ctr, s1ap, proxy, out_pkt, reply, erab_setup_rsp]).
+%% GTP-U (traffic) related counters +%% NOTE: these counters shall not be listed in ?S1GW_COUNTERS, +%% but created dynamically for each connecting eNB. +-define(S1GW_CTR_GTPU_PACKETS(ENBId, UDL), [ctr, enb, ENBId, gtpu, packets, UDL]). +-define(S1GW_CTR_GTPU_BYTES_UE(ENBId, UDL), [ctr, enb, ENBId, gtpu, bytes, ue, UDL]). +-define(S1GW_CTR_GTPU_BYTES_TOTAL(ENBId, UDL), [ctr, enb, ENBId, gtpu, bytes, total, UDL]). + -define(S1GW_GAUGE_PFCP_ASSOCIATED, [gauge, pfcp, associated]). -define(S1GW_GAUGE_S1AP_ENB_NUM_SCTP_CONNECTIONS, [gauge, s1ap, enb, num_sctp_connections]). -define(S1GW_GAUGE_S1AP_PROXY_UPLINK_PACKETS_QUEUED, [gauge, s1ap, proxy, uplink_packets_queued]). diff --git a/rebar.config b/rebar.config index 0100f17..b69ef47 100644 --- a/rebar.config +++ b/rebar.config @@ -9,6 +9,8 @@ {git, "https://github.com/rlipscombe/logger_color_formatter.git", {tag, "0.5.0"}}}, {pfcplib, {git, "https://github.com/travelping/pfcplib.git", {branch, "master"}}}, + {enftables, + {git, "https://gitea.osmocom.org/erlang/enftables.git", {branch, "master"}}}, {exometer_core, {git, "https://github.com/Feuerlabs/exometer_core.git", {branch, "master"}}}, {exometer_report_statsd, diff --git a/rebar.lock b/rebar.lock index c52d677..c5a90df 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,5 +1,9 @@ {"1.2.0", [{<<"cut">>,{pkg,<<"cut">>,<<"1.0.3">>},1}, + {<<"enftables">>, + {git,"https://gitea.osmocom.org/erlang/enftables.git", + {ref,"be23c8ebac948aa1eea450b368af437d12a7173a"}}, + 0}, {<<"exometer_core">>, {git,"https://github.com/Feuerlabs/exometer_core.git", {ref,"f9c7abc095edc893c9354a3d5f061715de1d9e79"}}, @@ -9,6 +13,10 @@ {ref,"f1c369becb6e57871f1c7b0e491f6c3a302a65ee"}}, 0}, {<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},1}, + {<<"jiffy">>, + {git,"https://github.com/davisp/jiffy", + {ref,"50daa80a62a97ffb6dd46ea9cb8ccb4930cbf1ae"}}, + 1}, {<<"logger_color_formatter">>, {git,"https://github.com/rlipscombe/logger_color_formatter.git", {ref,"f1c96f979e6350f8cd787d27fe9ff003cbf3416b"}}, diff --git a/src/enft_kpi.erl b/src/enft_kpi.erl new file mode 100644 index 0000000..015956c --- /dev/null +++ b/src/enft_kpi.erl @@ -0,0 +1,624 @@ +%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH info@sysmocom.de +%% Author: Vadim Yanitskiy vyanitskiy@sysmocom.de +%% +%% All Rights Reserved +%% +%% SPDX-License-Identifier: AGPL-3.0-or-later +%% +%% This program is free software; you can redistribute it and/or modify +%% it under the terms of the GNU Affero General Public License as +%% published by the Free Software Foundation; either version 3 of the +%% License, or (at your option) any later version. +%% +%% This program is distributed in the hope that it will be useful, +%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%% GNU General Public License for more details. +%% +%% You should have received a copy of the GNU Affero General Public License +%% along with this program. If not, see https://www.gnu.org/licenses/. +%% +%% Additional Permission under GNU AGPL version 3 section 7: +%% +%% If you modify this Program, or any covered work, by linking or +%% combining it with runtime libraries of Erlang/OTP as released by +%% Ericsson on https://www.erlang.org (or a modified version of these +%% libraries), containing parts covered by the terms of the Erlang Public +%% License (https://www.erlang.org/EPLICENSE), the licensors of this +%% Program grant you additional permission to convey the resulting work +%% without the need to license the runtime libraries of Erlang/OTP under +%% the GNU Affero General Public License. Corresponding Source for a +%% non-source form of such a combination shall include the source code +%% for the parts of the runtime libraries of Erlang/OTP used as well as +%% that of the covered work. + +-module(enft_kpi). +-behaviour(gen_server). + +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2]). +-export([start_link/1, + enb_register/1, + enb_set_addr/1, + enb_unregister/0, + fetch_counters/0, + shutdown/0]). + +-include_lib("kernel/include/logger.hrl"). + +-include("s1gw_metrics.hrl"). + +-define(GTPU_PORT, 2152). + +-define(OP_EQ, "=="). +-define(OP_NEQ, "!="). + + +-type cfg() :: #{enable => boolean(), + table_name => string(), + interval => non_neg_integer() %% like X34 in osmo-hnbgw + }. + +-record(ctr, {packets = 0 :: non_neg_integer(), + bytes_ue = 0 :: non_neg_integer(), + bytes_total = 0 :: non_neg_integer() + }). + +-type counter() :: #ctr{}. + +-type counters() :: dict:dict(K :: string(), + V :: counter()). + +-type enb_state() :: #{pid => pid(), + addr => string(), + genb_id => string(), + mon_ref => reference(), + handle_ul => integer(), + handle_dl => integer(), + ctr_ul => counter(), %% UL counters + ctr_dl => counter() %% DL counters + }. + +-type registry() :: dict:dict(K :: pid(), + V :: enb_state()). + +-record(state, {cfg :: cfg(), + registry :: registry() + }). + +-export_type([cfg/0, + counter/0, + counters/0]). + + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +-spec start_link(cfg()) -> gen_server:start_ret(). +start_link(Cfg) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Cfg], []). + + +-spec enb_register(GlobalENBId) -> ok | {error, term()} + when GlobalENBId :: string(). +enb_register(GlobalENBId) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, GlobalENBId}). + + +-spec enb_set_addr(Addr) -> ok | {error, term()} + when Addr :: string(). +enb_set_addr(Addr) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Addr}). + + +-spec enb_unregister() -> ok | {error, term()}. +enb_unregister() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + + +-spec fetch_counters() -> ok | {ok, counters()} | {error, term()}. +fetch_counters() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + + +-spec shutdown() -> ok. +shutdown() -> + gen_server:stop(?MODULE). + + +%% ------------------------------------------------------------------ +%% gen_server API +%% ------------------------------------------------------------------ + +init([#{enable := true} = Cfg]) -> + process_flag(trap_exit, true), + TName = maps:get(table_name, Cfg, "osmo-s1gw"), + Interval = maps:get(interval, Cfg, 3000), + %% ignore (accept) anything but GTPU @ udp/2152 + R1 = [nft_expr_match_ip_proto("udp", ?OP_NEQ), nft_expr_accept()], + R2 = [nft_expr_match_udp_dport(?GTPU_PORT, ?OP_NEQ), nft_expr_accept()], + Cmds = [nft_cmd_add_table(TName), + nft_cmd_add_chain(TName, "gtpu-ul", "prerouting"), + nft_cmd_add_chain(TName, "gtpu-dl", "postrouting"), + nft_cmd_add_rule(TName, "gtpu-ul", R1), + nft_cmd_add_rule(TName, "gtpu-dl", R1), + nft_cmd_add_rule(TName, "gtpu-ul", R2), + nft_cmd_add_rule(TName, "gtpu-dl", R2) + ], + case nft_exec(Cmds) of + ok -> + ?LOG_INFO("NFT table ~p has been initialized", [TName]), + spawn_link(fun() -> heartbeat(Interval) end), + {ok, #state{cfg = Cfg#{table_name => TName, + interval => Interval}, + registry = dict:new()}}; + Error -> + ?LOG_ERROR("NFT table ~p init failed: ~p", [TName, Error]), + {error, Error} + end; + +%% stub mode +init([#{enable := false} = Cfg]) -> + {ok, #state{cfg = Cfg, + registry = dict:new()}}. + + +handle_call(Info, From, + #state{cfg = #{enable := false}} = S) -> + ?LOG_DEBUG("ignore ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), + {reply, ok, S}; + +handle_call({enb_register, GlobalENBId}, {Pid, _Ref}, + #state{registry = R0} = S) -> + case dict:find(Pid, R0) of + {ok, _} -> + ?LOG_ERROR("eNB (pid ~p, ~p) is already registered", + [Pid, GlobalENBId]), + {reply, {error, already_registered}, S}; + error -> + %% keep an eye on the process being registered + MonRef = erlang:monitor(process, Pid), + %% add exometer counters + enb_add_metrics(GlobalENBId), + %% create and store an initial eNB state + ES = #{genb_id => GlobalENBId, + mon_ref => MonRef, + pid => Pid}, + R1 = dict:store(Pid, ES, R0), + ?LOG_INFO("eNB (pid ~p, ~p) has been registered", + [Pid, GlobalENBId]), + {reply, ok, S#state{registry = R1}} + end; + +handle_call({enb_set_addr, Addr}, {Pid, _Ref}, + #state{registry = R} = S0) -> + case dict:find(Pid, R) of + {ok, EnbState} -> + {Reply, S1} = enb_set_addr(Addr, EnbState, S0), + {reply, Reply, S1}; + error -> + ?LOG_ERROR("eNB @ ~p (pid ~p) is *not* registered", + [Addr, Pid]), + {reply, {error, not_registered}, S0} + end; + +handle_call(enb_unregister, {Pid, _Ref}, + #state{cfg = Cfg, registry = R0} = S) -> + case dict:find(Pid, R0) of + {ok, #{genb_id := GlobalENBId, + mon_ref := MonRef} = ES} -> + erlang:demonitor(MonRef, [flush]), + enb_del_nft_counters(ES, Cfg), + R1 = dict:erase(Pid, R0), + ?LOG_INFO("eNB (pid ~p, ~p) has been unregistered", + [Pid, GlobalENBId]), + {reply, ok, S#state{registry = R1}}; + error -> + ?LOG_ERROR("eNB (pid ~p) is *not* registered", [Pid]), + {reply, {error, enb_not_registered}, S} + end; + +handle_call(fetch_counters, _From, + #state{cfg = #{table_name := TName}} = S) -> + ?LOG_DEBUG("Fetching NFT counters"), + Cmds = [nft_cmd_list_counters(TName)], + case nft_exec(Cmds) of + {ok, Res} -> + Ctrs = parse_nft_counters(Res), + {reply, {ok, Ctrs}, S}; + {error, Error} -> + ?LOG_ERROR("Failed to fetch NFT counters: ~p", [Error]), + {reply, {error, Error}, S} + end; + +handle_call(Info, From, S) -> + ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), + {reply, {error, not_implemented}, S}. + + +handle_cast(Info, + #state{cfg = #{enable := false}} = S) -> + ?LOG_DEBUG("ignore ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, S}; + +handle_cast(report_nft_counters, + #state{cfg = #{table_name := TName}, + registry = R0} = S) -> + ?LOG_DEBUG("Fetching and reporting NFT counters"), + Cmds = [nft_cmd_list_counters(TName)], + case nft_exec(Cmds) of + {ok, Res} -> + Ctrs = parse_nft_counters(Res), + R1 = report_nft_counters(Ctrs, R0), + {noreply, S#state{registry = R1}}; + {error, Error} -> + ?LOG_ERROR("Failed to fetch NFT counters: ~p", [Error]), + {noreply, S} + end; + +handle_cast(Info, S) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, S}. + + +handle_info({'DOWN', _MonRef, process, Pid, Reason}, + #state{cfg = Cfg, registry = R0} = S) -> + ?LOG_INFO("eNB process ~p terminated with reason ~p", [Pid, Reason]), + case dict:find(Pid, R0) of + {ok, ES} -> + enb_del_nft_counters(ES, Cfg), + R1 = dict:erase(Pid, R0), + ?LOG_INFO("eNB (pid ~p) has been unregistered", [Pid]), + {noreply, S#state{registry = R1}}; + error -> + ?LOG_ERROR("eNB (pid ~p) is *not* registered", [Pid]), + {noreply, S} + end; + +handle_info(Info, S) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, S}. + + +terminate(Reason, + #state{cfg = Cfg}) -> + ?LOG_NOTICE("Terminating, reason ~p", [Reason]), + case Cfg of + #{enable := true, table_name := TName} -> + nft_exec([nft_cmd_del_table(TName)]), %% delete the table + ok; + _ -> ok %% stub mode + end. + + +%% ------------------------------------------------------------------ +%% private API +%% ------------------------------------------------------------------ + +enb_add_metrics(GlobalENBId) -> + enb_add_metrics(GlobalENBId, ul), + enb_add_metrics(GlobalENBId, dl). + +enb_add_metrics(GlobalENBId, UDL) -> + %% counters may already exist, so catch exceptions here + catch exometer:new(?S1GW_CTR_GTPU_PACKETS(GlobalENBId, UDL), counter), + catch exometer:new(?S1GW_CTR_GTPU_BYTES_UE(GlobalENBId, UDL), counter), + catch exometer:new(?S1GW_CTR_GTPU_BYTES_TOTAL(GlobalENBId, UDL), counter). + + +-spec enb_set_addr(Addr, ES, S0) -> {Reply, S1} + when Addr :: string(), + ES :: enb_state(), + S0 :: #state{}, + S1 :: #state{}, + Reply :: ok | {error, term()}. +enb_set_addr(Addr, %% given Addr matches stored Addr + #{genb_id := GlobalENBId, + addr := Addr, + pid := Pid}, S) -> + ?LOG_DEBUG("eNB @ ~p (pid ~p, ~p): address is already known", + [Addr, Pid, GlobalENBId]), + {ok, S}; + +enb_set_addr(NewAddr, %% given NewAddr differs from stored Addr + #{genb_id := GlobalENBId, + addr := Addr, + pid := Pid}, S) -> + ?LOG_ERROR("eNB @ ~p (pid ~p, ~p): address (~p) mismatch?!?", + [Addr, Pid, GlobalENBId, NewAddr]), + {{error, addr_mismatch}, S}; + +enb_set_addr(Addr, #{genb_id := GlobalENBId, + pid := Pid} = ES0, + #state{cfg = Cfg, registry = R0} = S) -> + ?LOG_DEBUG("eNB @ ~p (pid ~p, ~p): " + "address indicated, creating NFT rules/counters", + [Addr, Pid, GlobalENBId]), + case enb_add_nft_counters(ES0#{addr => Addr}, Cfg) of + {ok, ES1} -> + R1 = dict:store(Pid, ES1, R0), + ?LOG_INFO("eNB @ ~p (pid ~p, ~p): " + "NFT rules/counters created (KPI ready)", + [Addr, Pid, GlobalENBId]), + {ok, S#state{registry = R1}}; + {error, Error} -> + ?LOG_ERROR("eNB @ ~p (pid ~p, ~p): " + "creating NFT rules/counters failed: ~p", + [Addr, Pid, GlobalENBId, Error]), + {{error, Error}, S} + end. + + +-spec enb_add_nft_counters(ES0, Cfg) -> {ok, ES1} | {error, term()} + when ES0 :: enb_state(), + ES1 :: enb_state(), + Cfg :: cfg(). +enb_add_nft_counters(#{addr := Addr, + genb_id := GlobalENBId} = ES0, + #{table_name := TName}) -> + RUL = [nft_expr_match_ip_saddr(Addr, ?OP_EQ), + nft_expr_counter("ul-" ++ GlobalENBId)], + RDL = [nft_expr_match_ip_daddr(Addr, ?OP_EQ), + nft_expr_counter("dl-" ++ GlobalENBId)], + Cmds = [nft_cmd_add_counter(TName, "ul-" ++ GlobalENBId), + nft_cmd_add_counter(TName, "dl-" ++ GlobalENBId), + nft_cmd_add_rule(TName, "gtpu-ul", RUL), + nft_cmd_add_rule(TName, "gtpu-dl", RDL) + ], + case nft_exec(Cmds) of + ok -> + ULH = nft_chain_last_handle(TName, "gtpu-ul"), + DLH = nft_chain_last_handle(TName, "gtpu-dl"), + %% update the eNB state with new info + ES1 = ES0#{handle_ul => ULH, + handle_dl => DLH, + ctr_ul => #ctr{}, + ctr_dl => #ctr{}}, + {ok, ES1}; + {error, Error} -> + ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]), + {error, Error} + end. + + +-spec enb_del_nft_counters(enb_state(), cfg()) -> ok | {error, term()}. +enb_del_nft_counters(#{genb_id := GlobalENBId, + handle_ul := ULH, + handle_dl := DLH}, + #{table_name := TName}) -> + Cmds = [nft_cmd_del_rule(TName, "gtpu-ul", ULH), + nft_cmd_del_rule(TName, "gtpu-dl", DLH), + nft_cmd_del_counter(TName, "ul-" ++ GlobalENBId), + nft_cmd_del_counter(TName, "dl-" ++ GlobalENBId) + ], + case nft_exec(Cmds) of + ok -> ok; + {error, Error} -> + ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]), + {error, Error} + end; + +%% missing ULH/DLH => nothing to delete +enb_del_nft_counters(_ES, _Cfg) -> ok. + + +%% Parse the given list of NFT counters (result of nft_cmd_list_counters()). +-spec parse_nft_counters([map()]) -> counters(). +parse_nft_counters(L0) -> + L1 = lists:map(fun parse_nft_counter/1, L0), + dict:from_list(L1). + + +-spec parse_nft_counter(map()) -> {string(), counter()}. +parse_nft_counter(#{<< "counter" >> := #{<< "name" >> := Name, + << "bytes" >> := Bytes, + << "packets" >> := Packets}}) -> + %% Assuming an IP header of 20 bytes, derive the GTP-U payload size: + %% [...] \ \ + %% [ UDP ][ TCP ] | UE payload | nft reports these bytes + %% [ IP ] / | + %% -- payload -- | + %% [ GTP-U 8 bytes ] | \ + %% [ UDP 8 bytes ] | | need to subtract these, 20 + 8 + 8 + %% [ IP 20 bytes ] / / + Ctr = #ctr{packets = Packets, + bytes_ue = Bytes - erlang:min(Bytes, Packets * (20 + 8 + 8)), + bytes_total = Bytes}, + {binary_to_list(Name), Ctr}. + + +-spec report_nft_counters(Ctrs, R0) -> R1 + when Ctrs :: counters(), %% result of parse_nft_counters() + R0 :: registry(), %% (current) registry of eNBs + R1 :: registry(). %% (new) registry of eNBs +report_nft_counters(Ctrs, R0) -> + %% for each registered eNB, look-up and report UL/DL counters + dict:map(fun(_Pid, EnbState) -> enb_report_nft_counters(Ctrs, EnbState) end, R0). + + +-spec enb_report_nft_counters(counters(), enb_state()) -> enb_state(). +enb_report_nft_counters(Ctrs, #{genb_id := GlobalENBId, + ctr_ul := ULC, + ctr_dl := DLC} = ES) -> + ?LOG_DEBUG("Reporting NFT counters for eNB ~p", [GlobalENBId]), + ES#{ctr_ul => report_nft_counter(Ctrs, {ul, GlobalENBId}, ULC), + ctr_dl => report_nft_counter(Ctrs, {dl, GlobalENBId}, DLC)}; + +%% missing ULH/DLH => nothing to report +enb_report_nft_counters(_Ctrs, ES) -> ES. + + +-spec report_nft_counter(Ctrs, {UDL, GlobalENBId}, C0) -> C1 + when Ctrs :: counters(), + UDL :: ul | dl, + GlobalENBId :: string(), + C0 :: counter(), + C1 :: counter(). +report_nft_counter(Ctrs, {UDL, GlobalENBId}, C0) -> + CtrName = atom_to_list(UDL) ++ "-" ++ GlobalENBId, + case dict:find(CtrName, Ctrs) of + {ok, C0} -> + %% no diff, nothing to report + ?LOG_DEBUG("NFT counters (~p) for eNB ~p: ~p", + [UDL, GlobalENBId, C0]), + C0; + {ok, C1} -> + %% XXX: assuming C1 (new) values >= C0 (cached) values + ?LOG_DEBUG("NFT counters (~p) for eNB ~p: ~p -> ~p", + [UDL, GlobalENBId, C0, C1]), + s1gw_metrics:ctr_inc(?S1GW_CTR_GTPU_PACKETS(GlobalENBId, UDL), + C1#ctr.packets - C0#ctr.packets), + s1gw_metrics:ctr_inc(?S1GW_CTR_GTPU_BYTES_UE(GlobalENBId, UDL), + C1#ctr.bytes_ue - C0#ctr.bytes_ue), + s1gw_metrics:ctr_inc(?S1GW_CTR_GTPU_BYTES_TOTAL(GlobalENBId, UDL), + C1#ctr.bytes_total - C0#ctr.bytes_total), + C1; + error -> + %% no counters for this eNB (yet?) + ?LOG_DEBUG("NFT counters (~p) for eNB ~p: nope", + [UDL, GlobalENBId]), + C0 + end. + + +-spec nft_exec(Cmds) -> enftables:result() + when Cmds :: [enftables:nft_cmd()]. +nft_exec(Cmds) -> + ?LOG_DEBUG("Executing nftables commands: ~p", [Cmds]), + enftables:run_cmd(Cmds, json). + + +nft_chain_last_handle(TName, CName) -> + Cmds = [nft_cmd_list_chain(TName, CName)], + case nft_exec(Cmds) of + {ok, Res} -> + #{<< "rule" >> := Rule} = lists:last(Res), + maps:get(<< "handle" >>, Rule); + Error -> + ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]), + error + end. + + +nft_cmd_add_table(TName) -> + T = #{family => << "inet" >>, + name => list_to_binary(TName), + flags => [<< "owner" >>] + }, + #{add => #{table => T}}. + + +nft_cmd_del_table(TName) -> + T = #{family => << "inet" >>, + name => list_to_binary(TName) + }, + #{delete => #{table => T}}. + + +nft_cmd_add_chain(TName, CName, Hook) -> + C = #{family => << "inet" >>, + table => list_to_binary(TName), + name => list_to_binary(CName), + type => << "filter" >>, + hook => list_to_binary(Hook), + prio => 0, + policy => << "accept" >> + }, + #{add => #{chain => C}}. + + +nft_cmd_add_rule(TName, CName, Expr) -> + R = #{family => << "inet" >>, + table => list_to_binary(TName), + chain => list_to_binary(CName), + expr => Expr + }, + #{add => #{rule => R}}. + + +nft_cmd_del_rule(TName, CName, Handle) -> + R = #{family => << "inet" >>, + table => list_to_binary(TName), + chain => list_to_binary(CName), + handle => Handle + }, + #{delete => #{rule => R}}. + + +nft_counter(TName, Name) -> + #{family => << "inet" >>, + table => list_to_binary(TName), + name => list_to_binary(Name) + }. + +nft_cmd_add_counter(TName, Name) -> + #{add => #{counter => nft_counter(TName, Name)}}. + +nft_cmd_del_counter(TName, Name) -> + #{delete => #{counter => nft_counter(TName, Name)}}. + + +-spec nft_expr_match_payload({Proto, Field}, Value, Op) -> map() + when Proto :: string(), + Field :: string(), + Value :: term(), + Op :: string(). +nft_expr_match_payload({Proto, Field}, Value, Op) -> + Left = #{payload => #{protocol => list_to_binary(Proto), + field => list_to_binary(Field)}}, + #{match => #{left => Left, + right => Value, + op => list_to_binary(Op)}}. + + +nft_expr_match_ip_proto(Proto, Op) -> + nft_expr_match_payload({"ip", "protocol"}, + list_to_binary(Proto), Op). + +nft_expr_match_ip_saddr(Addr, Op) -> + nft_expr_match_payload({"ip", "saddr"}, + list_to_binary(Addr), Op). + +nft_expr_match_ip_daddr(Addr, Op) -> + nft_expr_match_payload({"ip", "daddr"}, + list_to_binary(Addr), Op). + +nft_expr_match_udp_dport(Port, Op) -> + nft_expr_match_payload({"udp", "dport"}, Port, Op). + + +nft_expr_accept() -> + #{accept => null}. + + +nft_expr_counter(Name) -> + #{counter => list_to_binary(Name)}. + + +nft_cmd_list_chain(TName, CName) -> + C = #{family => << "inet" >>, + table => list_to_binary(TName), + name => list_to_binary(CName) + }, + #{list => #{chain => C}}. + + +nft_cmd_list_counters(TName) -> + T = #{family => << "inet" >>, + name => list_to_binary(TName) + }, + #{list => #{counters => #{table => T}}}. + + +-spec heartbeat(timeout()) -> no_return(). +heartbeat(Tval) -> + timer:sleep(Tval), + gen_server:cast(?MODULE, report_nft_counters), + heartbeat(Tval). %% keep going + + +%% vim:set ts=4 sw=4 et: diff --git a/src/osmo_s1gw.app.src b/src/osmo_s1gw.app.src index 743a5d5..e64fb93 100644 --- a/src/osmo_s1gw.app.src +++ b/src/osmo_s1gw.app.src @@ -9,6 +9,7 @@ stdlib, logger_color_formatter, pfcplib, + enftables, exometer_core, exometer_report_statsd ]}, diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl index 279ad02..c746d7c 100644 --- a/src/osmo_s1gw_sup.erl +++ b/src/osmo_s1gw_sup.erl @@ -50,6 +50,9 @@ -define(ENV_DEFAULT_MME_REM_PORT, ?S1AP_PORT). -define(ENV_DEFAULT_PFCP_LOC_ADDR, "127.0.1.1"). -define(ENV_DEFAULT_PFCP_REM_ADDR, "127.0.1.2"). +-define(ENV_DEFAULT_ENFT_KPI_ENABLE, false). +-define(ENV_DEFAULT_ENFT_KPI_TABLE_NAME, "osmo-s1gw"). +-define(ENV_DEFAULT_ENFT_KPI_INTERVAL, 3000).
%% ------------------------------------------------------------------ %% supervisor API @@ -74,9 +77,14 @@ 5000, worker, [pfcp_peer]}, + EnftKpi = {enft_kpi, {enft_kpi, start_link, [enft_kpi_cfg()]}, + permanent, + 5000, + worker, + [enft_kpi]},
s1gw_metrics:init(), - {ok, {{one_for_one, 5, 10}, [SctpServer, PfcpPeer]}}. + {ok, {{one_for_one, 5, 10}, [SctpServer, PfcpPeer, EnftKpi]}}.
%% ------------------------------------------------------------------ @@ -106,4 +114,11 @@ handler => sctp_proxy, priv => client_cfg()}.
+ +-spec enft_kpi_cfg() -> enft_kpi:cfg(). +enft_kpi_cfg() -> + #{enable => osmo_s1gw:get_env(enft_kpi_enable, ?ENV_DEFAULT_ENFT_KPI_ENABLE), + table_name => osmo_s1gw:get_env(enft_kpi_table_name, ?ENV_DEFAULT_ENFT_KPI_TABLE_NAME), + interval => osmo_s1gw:get_env(enft_kpi_interval, ?ENV_DEFAULT_ENFT_KPI_INTERVAL)}. + %% vim:set ts=4 sw=4 et: diff --git a/src/s1ap_proxy.erl b/src/s1ap_proxy.erl index f2fd722..6b75945 100644 --- a/src/s1ap_proxy.erl +++ b/src/s1ap_proxy.erl @@ -278,6 +278,12 @@ end.
+-spec tla_str(binary()) -> string(). +tla_str(TLA0) -> + TLA1 = list_to_tuple(binary_to_list(TLA0)), + inet:ntoa(TLA1). + + %% Encode an S1AP PDU -spec encode_pdu(s1ap_pdu()) -> {ok, binary()} | {error, {asn1, tuple()}}. @@ -354,6 +360,14 @@ C0#'S1SetupRequest'.protocolIEs, S0), {forward, S1};
+%% 9.1.8.5 S1 SETUP RESPONSE +handle_pdu({successfulOutcome, + #'SuccessfulOutcome'{procedureCode = ?'id-S1Setup'}}, S) -> + ?LOG_DEBUG("Processing S1 SETUP RESPONSE"), + ok = enft_kpi:enb_register(genb_id_str(S)), + %% there's nothing to patch in this PDU, so we forward it as-is + {forward, S}; + %% 9.1.3.1 E-RAB SETUP REQUEST handle_pdu({Outcome = initiatingMessage, #'InitiatingMessage'{procedureCode = ?'id-E-RABSetup', @@ -676,6 +690,8 @@ #'E-RABSetupItemBearerSURes'{'e-RAB-ID' = ERABId, 'transportLayerAddress' = TLA_In, 'gTP-TEID' = << TEID_In:32/big >>} = C0, S) -> + %% indicate eNB's address to the enft_kpi module + enft_kpi:enb_set_addr(tla_str(TLA_In)), %% poke E-RAB FSM case erab_fsm_find(ERABId, S) of {ok, Pid} -> @@ -963,6 +979,8 @@ #'E-RABSetupItemCtxtSURes'{'e-RAB-ID' = ERABId, 'transportLayerAddress' = TLA_In, 'gTP-TEID' = << TEID_In:32/big >>} = C0, S) -> + %% indicate eNB's address to the enft_kpi module + enft_kpi:enb_set_addr(tla_str(TLA_In)), %% poke E-RAB FSM case erab_fsm_find(ERABId, S) of {ok, Pid} -> diff --git a/test/s1ap_proxy_test.erl b/test/s1ap_proxy_test.erl index 3a73910..dd1638b 100644 --- a/test/s1ap_proxy_test.erl +++ b/test/s1ap_proxy_test.erl @@ -23,6 +23,7 @@ pfcp_mock:mock_all(), exometer:start(), s1gw_metrics:init(), + enft_kpi:start_link(#{enable => false}), {ok, Pid} = s1ap_proxy:start_link(), #{handler => Pid}.
@@ -30,6 +31,7 @@ stop(#{handler := Pid}) -> s1ap_proxy:shutdown(Pid), exometer:stop(), + enft_kpi:shutdown(), pfcp_mock:unmock_all().