fixeria has submitted this change. (
https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/40281?usp=email )
Change subject: enft_kpi: retrieve per-eNB traffic counters
......................................................................
enft_kpi: retrieve per-eNB traffic counters
Change-Id: I498d2854447a2d53d2abddd38652f3e2bbb1fbdd
Related: SYS#7307
---
M config/sys.config
M debian/control
M include/s1gw_metrics.hrl
M rebar.config
M rebar.lock
A src/enft_kpi.erl
M src/osmo_s1gw.app.src
M src/osmo_s1gw_sup.erl
M src/s1ap_proxy.erl
M test/s1ap_proxy_test.erl
10 files changed, 684 insertions(+), 1 deletion(-)
Approvals:
Jenkins Builder: Verified
pespin: Looks good to me, but someone else must approve
fixeria: Looks good to me, approved
diff --git a/config/sys.config b/config/sys.config
index d405db8..57cb61c 100644
--- a/config/sys.config
+++ b/config/sys.config
@@ -19,6 +19,11 @@
%% Optional PFCP Network Instance IEs (omitted if not configured)
%% {pfcp_net_inst_core, << 16#09, "core-side" >>}, %% PFCP Network
Instance IE value (to core)
%% {pfcp_net_inst_access, << 16#0a, "radio-side" >>} %% PFCP
Network Instance IE value (to access)
+%%
+%% Optional NFT KPI configuration
+%% {enft_kpi_enable, true}, %% whether to enable the NFT KPI module (default: false)
+%% {enft_kpi_table_name, "osmo-s1gw"}, %% the NFT table name to be used by this
process
+%% {enft_kpi_interval, 3000} %% counter reporting interval (ms)
]},
%% ================================================================================
%% kernel config
diff --git a/debian/control b/debian/control
index a4279b7..10eddac 100644
--- a/debian/control
+++ b/debian/control
@@ -3,6 +3,7 @@
Section: net
Priority: optional
Build-Depends: erlang-nox,
+ libnftables-dev,
rebar3,
debhelper (>= 10)
Standards-Version: 4.5.0
diff --git a/include/s1gw_metrics.hrl b/include/s1gw_metrics.hrl
index 940529b..83ab46d 100644
--- a/include/s1gw_metrics.hrl
+++ b/include/s1gw_metrics.hrl
@@ -36,6 +36,13 @@
-define(S1GW_CTR_S1AP_PROXY_OUT_PKT_REPLY_ALL, [ctr, s1ap, proxy, out_pkt, reply, all]).
-define(S1GW_CTR_S1AP_PROXY_OUT_PKT_REPLY_ERAB_SETUP_RSP, [ctr, s1ap, proxy, out_pkt,
reply, erab_setup_rsp]).
+%% GTP-U (traffic) related counters
+%% NOTE: these counters shall not be listed in ?S1GW_COUNTERS,
+%% but created dynamically for each connecting eNB.
+-define(S1GW_CTR_GTPU_PACKETS(ENBId, UDL), [ctr, enb, ENBId, gtpu, packets, UDL]).
+-define(S1GW_CTR_GTPU_BYTES_UE(ENBId, UDL), [ctr, enb, ENBId, gtpu, bytes, ue, UDL]).
+-define(S1GW_CTR_GTPU_BYTES_TOTAL(ENBId, UDL), [ctr, enb, ENBId, gtpu, bytes, total,
UDL]).
+
-define(S1GW_GAUGE_PFCP_ASSOCIATED, [gauge, pfcp, associated]).
-define(S1GW_GAUGE_S1AP_ENB_NUM_SCTP_CONNECTIONS, [gauge, s1ap, enb,
num_sctp_connections]).
-define(S1GW_GAUGE_S1AP_PROXY_UPLINK_PACKETS_QUEUED, [gauge, s1ap, proxy,
uplink_packets_queued]).
diff --git a/rebar.config b/rebar.config
index 0100f17..b69ef47 100644
--- a/rebar.config
+++ b/rebar.config
@@ -9,6 +9,8 @@
{git, "https://github.com/rlipscombe/logger_color_formatter.git",
{tag, "0.5.0"}}},
{pfcplib,
{git, "https://github.com/travelping/pfcplib.git", {branch,
"master"}}},
+ {enftables,
+ {git, "https://gitea.osmocom.org/erlang/enftables.git", {branch,
"master"}}},
{exometer_core,
{git, "https://github.com/Feuerlabs/exometer_core.git", {branch,
"master"}}},
{exometer_report_statsd,
diff --git a/rebar.lock b/rebar.lock
index c52d677..c5a90df 100644
--- a/rebar.lock
+++ b/rebar.lock
@@ -1,5 +1,9 @@
{"1.2.0",
[{<<"cut">>,{pkg,<<"cut">>,<<"1.0.3">>},1},
+ {<<"enftables">>,
+ {git,"https://gitea.osmocom.org/erlang/enftables.git",
+ {ref,"be23c8ebac948aa1eea450b368af437d12a7173a"}},
+ 0},
{<<"exometer_core">>,
{git,"https://github.com/Feuerlabs/exometer_core.git",
{ref,"f9c7abc095edc893c9354a3d5f061715de1d9e79"}},
@@ -9,6 +13,10 @@
{ref,"f1c369becb6e57871f1c7b0e491f6c3a302a65ee"}},
0},
{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},1},
+ {<<"jiffy">>,
+ {git,"https://github.com/davisp/jiffy",
+ {ref,"50daa80a62a97ffb6dd46ea9cb8ccb4930cbf1ae"}},
+ 1},
{<<"logger_color_formatter">>,
{git,"https://github.com/rlipscombe/logger_color_formatter.git",
{ref,"f1c96f979e6350f8cd787d27fe9ff003cbf3416b"}},
diff --git a/src/enft_kpi.erl b/src/enft_kpi.erl
new file mode 100644
index 0000000..015956c
--- /dev/null
+++ b/src/enft_kpi.erl
@@ -0,0 +1,624 @@
+%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH <info(a)sysmocom.de>
+%% Author: Vadim Yanitskiy <vyanitskiy(a)sysmocom.de>
+%%
+%% All Rights Reserved
+%%
+%% SPDX-License-Identifier: AGPL-3.0-or-later
+%%
+%% This program is free software; you can redistribute it and/or modify
+%% it under the terms of the GNU Affero General Public License as
+%% published by the Free Software Foundation; either version 3 of the
+%% License, or (at your option) any later version.
+%%
+%% This program is distributed in the hope that it will be useful,
+%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%% GNU General Public License for more details.
+%%
+%% You should have received a copy of the GNU Affero General Public License
+%% along with this program. If not, see <https://www.gnu.org/licenses/>.
+%%
+%% Additional Permission under GNU AGPL version 3 section 7:
+%%
+%% If you modify this Program, or any covered work, by linking or
+%% combining it with runtime libraries of Erlang/OTP as released by
+%% Ericsson on
https://www.erlang.org (or a modified version of these
+%% libraries), containing parts covered by the terms of the Erlang Public
+%% License (
https://www.erlang.org/EPLICENSE), the licensors of this
+%% Program grant you additional permission to convey the resulting work
+%% without the need to license the runtime libraries of Erlang/OTP under
+%% the GNU Affero General Public License. Corresponding Source for a
+%% non-source form of such a combination shall include the source code
+%% for the parts of the runtime libraries of Erlang/OTP used as well as
+%% that of the covered work.
+
+-module(enft_kpi).
+-behaviour(gen_server).
+
+-export([init/1,
+ handle_info/2,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2]).
+-export([start_link/1,
+ enb_register/1,
+ enb_set_addr/1,
+ enb_unregister/0,
+ fetch_counters/0,
+ shutdown/0]).
+
+-include_lib("kernel/include/logger.hrl").
+
+-include("s1gw_metrics.hrl").
+
+-define(GTPU_PORT, 2152).
+
+-define(OP_EQ, "==").
+-define(OP_NEQ, "!=").
+
+
+-type cfg() :: #{enable => boolean(),
+ table_name => string(),
+ interval => non_neg_integer() %% like X34 in osmo-hnbgw
+ }.
+
+-record(ctr, {packets = 0 :: non_neg_integer(),
+ bytes_ue = 0 :: non_neg_integer(),
+ bytes_total = 0 :: non_neg_integer()
+ }).
+
+-type counter() :: #ctr{}.
+
+-type counters() :: dict:dict(K :: string(),
+ V :: counter()).
+
+-type enb_state() :: #{pid => pid(),
+ addr => string(),
+ genb_id => string(),
+ mon_ref => reference(),
+ handle_ul => integer(),
+ handle_dl => integer(),
+ ctr_ul => counter(), %% UL counters
+ ctr_dl => counter() %% DL counters
+ }.
+
+-type registry() :: dict:dict(K :: pid(),
+ V :: enb_state()).
+
+-record(state, {cfg :: cfg(),
+ registry :: registry()
+ }).
+
+-export_type([cfg/0,
+ counter/0,
+ counters/0]).
+
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+-spec start_link(cfg()) -> gen_server:start_ret().
+start_link(Cfg) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [Cfg], []).
+
+
+-spec enb_register(GlobalENBId) -> ok | {error, term()}
+ when GlobalENBId :: string().
+enb_register(GlobalENBId) ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, GlobalENBId}).
+
+
+-spec enb_set_addr(Addr) -> ok | {error, term()}
+ when Addr :: string().
+enb_set_addr(Addr) ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, Addr}).
+
+
+-spec enb_unregister() -> ok | {error, term()}.
+enb_unregister() ->
+ gen_server:call(?MODULE, ?FUNCTION_NAME).
+
+
+-spec fetch_counters() -> ok | {ok, counters()} | {error, term()}.
+fetch_counters() ->
+ gen_server:call(?MODULE, ?FUNCTION_NAME).
+
+
+-spec shutdown() -> ok.
+shutdown() ->
+ gen_server:stop(?MODULE).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([#{enable := true} = Cfg]) ->
+ process_flag(trap_exit, true),
+ TName = maps:get(table_name, Cfg, "osmo-s1gw"),
+ Interval = maps:get(interval, Cfg, 3000),
+ %% ignore (accept) anything but GTPU @ udp/2152
+ R1 = [nft_expr_match_ip_proto("udp", ?OP_NEQ), nft_expr_accept()],
+ R2 = [nft_expr_match_udp_dport(?GTPU_PORT, ?OP_NEQ), nft_expr_accept()],
+ Cmds = [nft_cmd_add_table(TName),
+ nft_cmd_add_chain(TName, "gtpu-ul", "prerouting"),
+ nft_cmd_add_chain(TName, "gtpu-dl", "postrouting"),
+ nft_cmd_add_rule(TName, "gtpu-ul", R1),
+ nft_cmd_add_rule(TName, "gtpu-dl", R1),
+ nft_cmd_add_rule(TName, "gtpu-ul", R2),
+ nft_cmd_add_rule(TName, "gtpu-dl", R2)
+ ],
+ case nft_exec(Cmds) of
+ ok ->
+ ?LOG_INFO("NFT table ~p has been initialized", [TName]),
+ spawn_link(fun() -> heartbeat(Interval) end),
+ {ok, #state{cfg = Cfg#{table_name => TName,
+ interval => Interval},
+ registry = dict:new()}};
+ Error ->
+ ?LOG_ERROR("NFT table ~p init failed: ~p", [TName, Error]),
+ {error, Error}
+ end;
+
+%% stub mode
+init([#{enable := false} = Cfg]) ->
+ {ok, #state{cfg = Cfg,
+ registry = dict:new()}}.
+
+
+handle_call(Info, From,
+ #state{cfg = #{enable := false}} = S) ->
+ ?LOG_DEBUG("ignore ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]),
+ {reply, ok, S};
+
+handle_call({enb_register, GlobalENBId}, {Pid, _Ref},
+ #state{registry = R0} = S) ->
+ case dict:find(Pid, R0) of
+ {ok, _} ->
+ ?LOG_ERROR("eNB (pid ~p, ~p) is already registered",
+ [Pid, GlobalENBId]),
+ {reply, {error, already_registered}, S};
+ error ->
+ %% keep an eye on the process being registered
+ MonRef = erlang:monitor(process, Pid),
+ %% add exometer counters
+ enb_add_metrics(GlobalENBId),
+ %% create and store an initial eNB state
+ ES = #{genb_id => GlobalENBId,
+ mon_ref => MonRef,
+ pid => Pid},
+ R1 = dict:store(Pid, ES, R0),
+ ?LOG_INFO("eNB (pid ~p, ~p) has been registered",
+ [Pid, GlobalENBId]),
+ {reply, ok, S#state{registry = R1}}
+ end;
+
+handle_call({enb_set_addr, Addr}, {Pid, _Ref},
+ #state{registry = R} = S0) ->
+ case dict:find(Pid, R) of
+ {ok, EnbState} ->
+ {Reply, S1} = enb_set_addr(Addr, EnbState, S0),
+ {reply, Reply, S1};
+ error ->
+ ?LOG_ERROR("eNB @ ~p (pid ~p) is *not* registered",
+ [Addr, Pid]),
+ {reply, {error, not_registered}, S0}
+ end;
+
+handle_call(enb_unregister, {Pid, _Ref},
+ #state{cfg = Cfg, registry = R0} = S) ->
+ case dict:find(Pid, R0) of
+ {ok, #{genb_id := GlobalENBId,
+ mon_ref := MonRef} = ES} ->
+ erlang:demonitor(MonRef, [flush]),
+ enb_del_nft_counters(ES, Cfg),
+ R1 = dict:erase(Pid, R0),
+ ?LOG_INFO("eNB (pid ~p, ~p) has been unregistered",
+ [Pid, GlobalENBId]),
+ {reply, ok, S#state{registry = R1}};
+ error ->
+ ?LOG_ERROR("eNB (pid ~p) is *not* registered", [Pid]),
+ {reply, {error, enb_not_registered}, S}
+ end;
+
+handle_call(fetch_counters, _From,
+ #state{cfg = #{table_name := TName}} = S) ->
+ ?LOG_DEBUG("Fetching NFT counters"),
+ Cmds = [nft_cmd_list_counters(TName)],
+ case nft_exec(Cmds) of
+ {ok, Res} ->
+ Ctrs = parse_nft_counters(Res),
+ {reply, {ok, Ctrs}, S};
+ {error, Error} ->
+ ?LOG_ERROR("Failed to fetch NFT counters: ~p", [Error]),
+ {reply, {error, Error}, S}
+ end;
+
+handle_call(Info, From, S) ->
+ ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]),
+ {reply, {error, not_implemented}, S}.
+
+
+handle_cast(Info,
+ #state{cfg = #{enable := false}} = S) ->
+ ?LOG_DEBUG("ignore ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, S};
+
+handle_cast(report_nft_counters,
+ #state{cfg = #{table_name := TName},
+ registry = R0} = S) ->
+ ?LOG_DEBUG("Fetching and reporting NFT counters"),
+ Cmds = [nft_cmd_list_counters(TName)],
+ case nft_exec(Cmds) of
+ {ok, Res} ->
+ Ctrs = parse_nft_counters(Res),
+ R1 = report_nft_counters(Ctrs, R0),
+ {noreply, S#state{registry = R1}};
+ {error, Error} ->
+ ?LOG_ERROR("Failed to fetch NFT counters: ~p", [Error]),
+ {noreply, S}
+ end;
+
+handle_cast(Info, S) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, S}.
+
+
+handle_info({'DOWN', _MonRef, process, Pid, Reason},
+ #state{cfg = Cfg, registry = R0} = S) ->
+ ?LOG_INFO("eNB process ~p terminated with reason ~p", [Pid, Reason]),
+ case dict:find(Pid, R0) of
+ {ok, ES} ->
+ enb_del_nft_counters(ES, Cfg),
+ R1 = dict:erase(Pid, R0),
+ ?LOG_INFO("eNB (pid ~p) has been unregistered", [Pid]),
+ {noreply, S#state{registry = R1}};
+ error ->
+ ?LOG_ERROR("eNB (pid ~p) is *not* registered", [Pid]),
+ {noreply, S}
+ end;
+
+handle_info(Info, S) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, S}.
+
+
+terminate(Reason,
+ #state{cfg = Cfg}) ->
+ ?LOG_NOTICE("Terminating, reason ~p", [Reason]),
+ case Cfg of
+ #{enable := true, table_name := TName} ->
+ nft_exec([nft_cmd_del_table(TName)]), %% delete the table
+ ok;
+ _ -> ok %% stub mode
+ end.
+
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+enb_add_metrics(GlobalENBId) ->
+ enb_add_metrics(GlobalENBId, ul),
+ enb_add_metrics(GlobalENBId, dl).
+
+enb_add_metrics(GlobalENBId, UDL) ->
+ %% counters may already exist, so catch exceptions here
+ catch exometer:new(?S1GW_CTR_GTPU_PACKETS(GlobalENBId, UDL), counter),
+ catch exometer:new(?S1GW_CTR_GTPU_BYTES_UE(GlobalENBId, UDL), counter),
+ catch exometer:new(?S1GW_CTR_GTPU_BYTES_TOTAL(GlobalENBId, UDL), counter).
+
+
+-spec enb_set_addr(Addr, ES, S0) -> {Reply, S1}
+ when Addr :: string(),
+ ES :: enb_state(),
+ S0 :: #state{},
+ S1 :: #state{},
+ Reply :: ok | {error, term()}.
+enb_set_addr(Addr, %% given Addr matches stored Addr
+ #{genb_id := GlobalENBId,
+ addr := Addr,
+ pid := Pid}, S) ->
+ ?LOG_DEBUG("eNB @ ~p (pid ~p, ~p): address is already known",
+ [Addr, Pid, GlobalENBId]),
+ {ok, S};
+
+enb_set_addr(NewAddr, %% given NewAddr differs from stored Addr
+ #{genb_id := GlobalENBId,
+ addr := Addr,
+ pid := Pid}, S) ->
+ ?LOG_ERROR("eNB @ ~p (pid ~p, ~p): address (~p) mismatch?!?",
+ [Addr, Pid, GlobalENBId, NewAddr]),
+ {{error, addr_mismatch}, S};
+
+enb_set_addr(Addr, #{genb_id := GlobalENBId,
+ pid := Pid} = ES0,
+ #state{cfg = Cfg, registry = R0} = S) ->
+ ?LOG_DEBUG("eNB @ ~p (pid ~p, ~p): "
+ "address indicated, creating NFT rules/counters",
+ [Addr, Pid, GlobalENBId]),
+ case enb_add_nft_counters(ES0#{addr => Addr}, Cfg) of
+ {ok, ES1} ->
+ R1 = dict:store(Pid, ES1, R0),
+ ?LOG_INFO("eNB @ ~p (pid ~p, ~p): "
+ "NFT rules/counters created (KPI ready)",
+ [Addr, Pid, GlobalENBId]),
+ {ok, S#state{registry = R1}};
+ {error, Error} ->
+ ?LOG_ERROR("eNB @ ~p (pid ~p, ~p): "
+ "creating NFT rules/counters failed: ~p",
+ [Addr, Pid, GlobalENBId, Error]),
+ {{error, Error}, S}
+ end.
+
+
+-spec enb_add_nft_counters(ES0, Cfg) -> {ok, ES1} | {error, term()}
+ when ES0 :: enb_state(),
+ ES1 :: enb_state(),
+ Cfg :: cfg().
+enb_add_nft_counters(#{addr := Addr,
+ genb_id := GlobalENBId} = ES0,
+ #{table_name := TName}) ->
+ RUL = [nft_expr_match_ip_saddr(Addr, ?OP_EQ),
+ nft_expr_counter("ul-" ++ GlobalENBId)],
+ RDL = [nft_expr_match_ip_daddr(Addr, ?OP_EQ),
+ nft_expr_counter("dl-" ++ GlobalENBId)],
+ Cmds = [nft_cmd_add_counter(TName, "ul-" ++ GlobalENBId),
+ nft_cmd_add_counter(TName, "dl-" ++ GlobalENBId),
+ nft_cmd_add_rule(TName, "gtpu-ul", RUL),
+ nft_cmd_add_rule(TName, "gtpu-dl", RDL)
+ ],
+ case nft_exec(Cmds) of
+ ok ->
+ ULH = nft_chain_last_handle(TName, "gtpu-ul"),
+ DLH = nft_chain_last_handle(TName, "gtpu-dl"),
+ %% update the eNB state with new info
+ ES1 = ES0#{handle_ul => ULH,
+ handle_dl => DLH,
+ ctr_ul => #ctr{},
+ ctr_dl => #ctr{}},
+ {ok, ES1};
+ {error, Error} ->
+ ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]),
+ {error, Error}
+ end.
+
+
+-spec enb_del_nft_counters(enb_state(), cfg()) -> ok | {error, term()}.
+enb_del_nft_counters(#{genb_id := GlobalENBId,
+ handle_ul := ULH,
+ handle_dl := DLH},
+ #{table_name := TName}) ->
+ Cmds = [nft_cmd_del_rule(TName, "gtpu-ul", ULH),
+ nft_cmd_del_rule(TName, "gtpu-dl", DLH),
+ nft_cmd_del_counter(TName, "ul-" ++ GlobalENBId),
+ nft_cmd_del_counter(TName, "dl-" ++ GlobalENBId)
+ ],
+ case nft_exec(Cmds) of
+ ok -> ok;
+ {error, Error} ->
+ ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]),
+ {error, Error}
+ end;
+
+%% missing ULH/DLH => nothing to delete
+enb_del_nft_counters(_ES, _Cfg) -> ok.
+
+
+%% Parse the given list of NFT counters (result of nft_cmd_list_counters()).
+-spec parse_nft_counters([map()]) -> counters().
+parse_nft_counters(L0) ->
+ L1 = lists:map(fun parse_nft_counter/1, L0),
+ dict:from_list(L1).
+
+
+-spec parse_nft_counter(map()) -> {string(), counter()}.
+parse_nft_counter(#{<< "counter" >> := #{<< "name"
>> := Name,
+ << "bytes" >> := Bytes,
+ << "packets" >> :=
Packets}}) ->
+ %% Assuming an IP header of 20 bytes, derive the GTP-U payload size:
+ %% [...] \ \
+ %% [ UDP ][ TCP ] | UE payload | nft reports these bytes
+ %% [ IP ] / |
+ %% -- payload -- |
+ %% [ GTP-U 8 bytes ] | \
+ %% [ UDP 8 bytes ] | | need to subtract these, 20 + 8 + 8
+ %% [ IP 20 bytes ] / /
+ Ctr = #ctr{packets = Packets,
+ bytes_ue = Bytes - erlang:min(Bytes, Packets * (20 + 8 + 8)),
+ bytes_total = Bytes},
+ {binary_to_list(Name), Ctr}.
+
+
+-spec report_nft_counters(Ctrs, R0) -> R1
+ when Ctrs :: counters(), %% result of parse_nft_counters()
+ R0 :: registry(), %% (current) registry of eNBs
+ R1 :: registry(). %% (new) registry of eNBs
+report_nft_counters(Ctrs, R0) ->
+ %% for each registered eNB, look-up and report UL/DL counters
+ dict:map(fun(_Pid, EnbState) -> enb_report_nft_counters(Ctrs, EnbState) end, R0).
+
+
+-spec enb_report_nft_counters(counters(), enb_state()) -> enb_state().
+enb_report_nft_counters(Ctrs, #{genb_id := GlobalENBId,
+ ctr_ul := ULC,
+ ctr_dl := DLC} = ES) ->
+ ?LOG_DEBUG("Reporting NFT counters for eNB ~p", [GlobalENBId]),
+ ES#{ctr_ul => report_nft_counter(Ctrs, {ul, GlobalENBId}, ULC),
+ ctr_dl => report_nft_counter(Ctrs, {dl, GlobalENBId}, DLC)};
+
+%% missing ULH/DLH => nothing to report
+enb_report_nft_counters(_Ctrs, ES) -> ES.
+
+
+-spec report_nft_counter(Ctrs, {UDL, GlobalENBId}, C0) -> C1
+ when Ctrs :: counters(),
+ UDL :: ul | dl,
+ GlobalENBId :: string(),
+ C0 :: counter(),
+ C1 :: counter().
+report_nft_counter(Ctrs, {UDL, GlobalENBId}, C0) ->
+ CtrName = atom_to_list(UDL) ++ "-" ++ GlobalENBId,
+ case dict:find(CtrName, Ctrs) of
+ {ok, C0} ->
+ %% no diff, nothing to report
+ ?LOG_DEBUG("NFT counters (~p) for eNB ~p: ~p",
+ [UDL, GlobalENBId, C0]),
+ C0;
+ {ok, C1} ->
+ %% XXX: assuming C1 (new) values >= C0 (cached) values
+ ?LOG_DEBUG("NFT counters (~p) for eNB ~p: ~p -> ~p",
+ [UDL, GlobalENBId, C0, C1]),
+ s1gw_metrics:ctr_inc(?S1GW_CTR_GTPU_PACKETS(GlobalENBId, UDL),
+ C1#ctr.packets - C0#ctr.packets),
+ s1gw_metrics:ctr_inc(?S1GW_CTR_GTPU_BYTES_UE(GlobalENBId, UDL),
+ C1#ctr.bytes_ue - C0#ctr.bytes_ue),
+ s1gw_metrics:ctr_inc(?S1GW_CTR_GTPU_BYTES_TOTAL(GlobalENBId, UDL),
+ C1#ctr.bytes_total - C0#ctr.bytes_total),
+ C1;
+ error ->
+ %% no counters for this eNB (yet?)
+ ?LOG_DEBUG("NFT counters (~p) for eNB ~p: nope",
+ [UDL, GlobalENBId]),
+ C0
+ end.
+
+
+-spec nft_exec(Cmds) -> enftables:result()
+ when Cmds :: [enftables:nft_cmd()].
+nft_exec(Cmds) ->
+ ?LOG_DEBUG("Executing nftables commands: ~p", [Cmds]),
+ enftables:run_cmd(Cmds, json).
+
+
+nft_chain_last_handle(TName, CName) ->
+ Cmds = [nft_cmd_list_chain(TName, CName)],
+ case nft_exec(Cmds) of
+ {ok, Res} ->
+ #{<< "rule" >> := Rule} = lists:last(Res),
+ maps:get(<< "handle" >>, Rule);
+ Error ->
+ ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]),
+ error
+ end.
+
+
+nft_cmd_add_table(TName) ->
+ T = #{family => << "inet" >>,
+ name => list_to_binary(TName),
+ flags => [<< "owner" >>]
+ },
+ #{add => #{table => T}}.
+
+
+nft_cmd_del_table(TName) ->
+ T = #{family => << "inet" >>,
+ name => list_to_binary(TName)
+ },
+ #{delete => #{table => T}}.
+
+
+nft_cmd_add_chain(TName, CName, Hook) ->
+ C = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ name => list_to_binary(CName),
+ type => << "filter" >>,
+ hook => list_to_binary(Hook),
+ prio => 0,
+ policy => << "accept" >>
+ },
+ #{add => #{chain => C}}.
+
+
+nft_cmd_add_rule(TName, CName, Expr) ->
+ R = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ chain => list_to_binary(CName),
+ expr => Expr
+ },
+ #{add => #{rule => R}}.
+
+
+nft_cmd_del_rule(TName, CName, Handle) ->
+ R = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ chain => list_to_binary(CName),
+ handle => Handle
+ },
+ #{delete => #{rule => R}}.
+
+
+nft_counter(TName, Name) ->
+ #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ name => list_to_binary(Name)
+ }.
+
+nft_cmd_add_counter(TName, Name) ->
+ #{add => #{counter => nft_counter(TName, Name)}}.
+
+nft_cmd_del_counter(TName, Name) ->
+ #{delete => #{counter => nft_counter(TName, Name)}}.
+
+
+-spec nft_expr_match_payload({Proto, Field}, Value, Op) -> map()
+ when Proto :: string(),
+ Field :: string(),
+ Value :: term(),
+ Op :: string().
+nft_expr_match_payload({Proto, Field}, Value, Op) ->
+ Left = #{payload => #{protocol => list_to_binary(Proto),
+ field => list_to_binary(Field)}},
+ #{match => #{left => Left,
+ right => Value,
+ op => list_to_binary(Op)}}.
+
+
+nft_expr_match_ip_proto(Proto, Op) ->
+ nft_expr_match_payload({"ip", "protocol"},
+ list_to_binary(Proto), Op).
+
+nft_expr_match_ip_saddr(Addr, Op) ->
+ nft_expr_match_payload({"ip", "saddr"},
+ list_to_binary(Addr), Op).
+
+nft_expr_match_ip_daddr(Addr, Op) ->
+ nft_expr_match_payload({"ip", "daddr"},
+ list_to_binary(Addr), Op).
+
+nft_expr_match_udp_dport(Port, Op) ->
+ nft_expr_match_payload({"udp", "dport"}, Port, Op).
+
+
+nft_expr_accept() ->
+ #{accept => null}.
+
+
+nft_expr_counter(Name) ->
+ #{counter => list_to_binary(Name)}.
+
+
+nft_cmd_list_chain(TName, CName) ->
+ C = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ name => list_to_binary(CName)
+ },
+ #{list => #{chain => C}}.
+
+
+nft_cmd_list_counters(TName) ->
+ T = #{family => << "inet" >>,
+ name => list_to_binary(TName)
+ },
+ #{list => #{counters => #{table => T}}}.
+
+
+-spec heartbeat(timeout()) -> no_return().
+heartbeat(Tval) ->
+ timer:sleep(Tval),
+ gen_server:cast(?MODULE, report_nft_counters),
+ heartbeat(Tval). %% keep going
+
+
+%% vim:set ts=4 sw=4 et:
diff --git a/src/osmo_s1gw.app.src b/src/osmo_s1gw.app.src
index 743a5d5..e64fb93 100644
--- a/src/osmo_s1gw.app.src
+++ b/src/osmo_s1gw.app.src
@@ -9,6 +9,7 @@
stdlib,
logger_color_formatter,
pfcplib,
+ enftables,
exometer_core,
exometer_report_statsd
]},
diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl
index 279ad02..c746d7c 100644
--- a/src/osmo_s1gw_sup.erl
+++ b/src/osmo_s1gw_sup.erl
@@ -50,6 +50,9 @@
-define(ENV_DEFAULT_MME_REM_PORT, ?S1AP_PORT).
-define(ENV_DEFAULT_PFCP_LOC_ADDR, "127.0.1.1").
-define(ENV_DEFAULT_PFCP_REM_ADDR, "127.0.1.2").
+-define(ENV_DEFAULT_ENFT_KPI_ENABLE, false).
+-define(ENV_DEFAULT_ENFT_KPI_TABLE_NAME, "osmo-s1gw").
+-define(ENV_DEFAULT_ENFT_KPI_INTERVAL, 3000).
%% ------------------------------------------------------------------
%% supervisor API
@@ -74,9 +77,14 @@
5000,
worker,
[pfcp_peer]},
+ EnftKpi = {enft_kpi, {enft_kpi, start_link, [enft_kpi_cfg()]},
+ permanent,
+ 5000,
+ worker,
+ [enft_kpi]},
s1gw_metrics:init(),
- {ok, {{one_for_one, 5, 10}, [SctpServer, PfcpPeer]}}.
+ {ok, {{one_for_one, 5, 10}, [SctpServer, PfcpPeer, EnftKpi]}}.
%% ------------------------------------------------------------------
@@ -106,4 +114,11 @@
handler => sctp_proxy,
priv => client_cfg()}.
+
+-spec enft_kpi_cfg() -> enft_kpi:cfg().
+enft_kpi_cfg() ->
+ #{enable => osmo_s1gw:get_env(enft_kpi_enable, ?ENV_DEFAULT_ENFT_KPI_ENABLE),
+ table_name => osmo_s1gw:get_env(enft_kpi_table_name,
?ENV_DEFAULT_ENFT_KPI_TABLE_NAME),
+ interval => osmo_s1gw:get_env(enft_kpi_interval,
?ENV_DEFAULT_ENFT_KPI_INTERVAL)}.
+
%% vim:set ts=4 sw=4 et:
diff --git a/src/s1ap_proxy.erl b/src/s1ap_proxy.erl
index f2fd722..6b75945 100644
--- a/src/s1ap_proxy.erl
+++ b/src/s1ap_proxy.erl
@@ -278,6 +278,12 @@
end.
+-spec tla_str(binary()) -> string().
+tla_str(TLA0) ->
+ TLA1 = list_to_tuple(binary_to_list(TLA0)),
+ inet:ntoa(TLA1).
+
+
%% Encode an S1AP PDU
-spec encode_pdu(s1ap_pdu()) -> {ok, binary()} |
{error, {asn1, tuple()}}.
@@ -354,6 +360,14 @@
C0#'S1SetupRequest'.protocolIEs, S0),
{forward, S1};
+%% 9.1.8.5 S1 SETUP RESPONSE
+handle_pdu({successfulOutcome,
+ #'SuccessfulOutcome'{procedureCode = ?'id-S1Setup'}}, S)
->
+ ?LOG_DEBUG("Processing S1 SETUP RESPONSE"),
+ ok = enft_kpi:enb_register(genb_id_str(S)),
+ %% there's nothing to patch in this PDU, so we forward it as-is
+ {forward, S};
+
%% 9.1.3.1 E-RAB SETUP REQUEST
handle_pdu({Outcome = initiatingMessage,
#'InitiatingMessage'{procedureCode = ?'id-E-RABSetup',
@@ -676,6 +690,8 @@
#'E-RABSetupItemBearerSURes'{'e-RAB-ID' = ERABId,
'transportLayerAddress' = TLA_In,
'gTP-TEID' = << TEID_In:32/big
>>} = C0, S) ->
+ %% indicate eNB's address to the enft_kpi module
+ enft_kpi:enb_set_addr(tla_str(TLA_In)),
%% poke E-RAB FSM
case erab_fsm_find(ERABId, S) of
{ok, Pid} ->
@@ -963,6 +979,8 @@
#'E-RABSetupItemCtxtSURes'{'e-RAB-ID' = ERABId,
'transportLayerAddress' = TLA_In,
'gTP-TEID' = << TEID_In:32/big
>>} = C0, S) ->
+ %% indicate eNB's address to the enft_kpi module
+ enft_kpi:enb_set_addr(tla_str(TLA_In)),
%% poke E-RAB FSM
case erab_fsm_find(ERABId, S) of
{ok, Pid} ->
diff --git a/test/s1ap_proxy_test.erl b/test/s1ap_proxy_test.erl
index 3a73910..dd1638b 100644
--- a/test/s1ap_proxy_test.erl
+++ b/test/s1ap_proxy_test.erl
@@ -23,6 +23,7 @@
pfcp_mock:mock_all(),
exometer:start(),
s1gw_metrics:init(),
+ enft_kpi:start_link(#{enable => false}),
{ok, Pid} = s1ap_proxy:start_link(),
#{handler => Pid}.
@@ -30,6 +31,7 @@
stop(#{handler := Pid}) ->
s1ap_proxy:shutdown(Pid),
exometer:stop(),
+ enft_kpi:shutdown(),
pfcp_mock:unmock_all().
--
To view, visit
https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/40281?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: merged
Gerrit-Project: erlang/osmo-s1gw
Gerrit-Branch: master
Gerrit-Change-Id: I498d2854447a2d53d2abddd38652f3e2bbb1fbdd
Gerrit-Change-Number: 40281
Gerrit-PatchSet: 12
Gerrit-Owner: fixeria <vyanitskiy(a)sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: fixeria <vyanitskiy(a)sysmocom.de>
Gerrit-Reviewer: laforge <laforge(a)osmocom.org>
Gerrit-Reviewer: pespin <pespin(a)sysmocom.de>