fixeria has uploaded this change for review. ( https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/40281?usp=email )
Change subject: [WIP] enft_kpi: retrieve per-eNB traffic counters ......................................................................
[WIP] enft_kpi: retrieve per-eNB traffic counters
Change-Id: I498d2854447a2d53d2abddd38652f3e2bbb1fbdd Related: SYS#7307 --- M rebar.config M rebar.lock A src/enft_kpi.erl 3 files changed, 441 insertions(+), 0 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/erlang/osmo-s1gw refs/changes/81/40281/1
diff --git a/rebar.config b/rebar.config index 0100f17..31dd511 100644 --- a/rebar.config +++ b/rebar.config @@ -9,6 +9,8 @@ {git, "https://github.com/rlipscombe/logger_color_formatter.git", {tag, "0.5.0"}}}, {pfcplib, {git, "https://github.com/travelping/pfcplib.git", {branch, "master"}}}, + {enftables, + {git, "https://gitea.osmocom.org/vyanitskiy/enftables.git", {branch, "master"}}}, {exometer_core, {git, "https://github.com/Feuerlabs/exometer_core.git", {branch, "master"}}}, {exometer_report_statsd, diff --git a/rebar.lock b/rebar.lock index c52d677..0d8cc0d 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,5 +1,9 @@ {"1.2.0", [{<<"cut">>,{pkg,<<"cut">>,<<"1.0.3">>},1}, + {<<"enftables">>, + {git,"https://gitea.osmocom.org/vyanitskiy/enftables.git", + {ref,"44cbec3cdc792781ee35eda3f1b586b41519c5bd"}}, + 0}, {<<"exometer_core">>, {git,"https://github.com/Feuerlabs/exometer_core.git", {ref,"f9c7abc095edc893c9354a3d5f061715de1d9e79"}}, diff --git a/src/enft_kpi.erl b/src/enft_kpi.erl new file mode 100644 index 0000000..8d6dcc4 --- /dev/null +++ b/src/enft_kpi.erl @@ -0,0 +1,435 @@ +%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH info@sysmocom.de +%% Author: Vadim Yanitskiy vyanitskiy@sysmocom.de +%% +%% All Rights Reserved +%% +%% SPDX-License-Identifier: AGPL-3.0-or-later +%% +%% This program is free software; you can redistribute it and/or modify +%% it under the terms of the GNU Affero General Public License as +%% published by the Free Software Foundation; either version 3 of the +%% License, or (at your option) any later version. +%% +%% This program is distributed in the hope that it will be useful, +%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%% GNU General Public License for more details. +%% +%% You should have received a copy of the GNU Affero General Public License +%% along with this program. If not, see https://www.gnu.org/licenses/. +%% +%% Additional Permission under GNU AGPL version 3 section 7: +%% +%% If you modify this Program, or any covered work, by linking or +%% combining it with runtime libraries of Erlang/OTP as released by +%% Ericsson on https://www.erlang.org (or a modified version of these +%% libraries), containing parts covered by the terms of the Erlang Public +%% License (https://www.erlang.org/EPLICENSE), the licensors of this +%% Program grant you additional permission to convey the resulting work +%% without the need to license the runtime libraries of Erlang/OTP under +%% the GNU Affero General Public License. Corresponding Source for a +%% non-source form of such a combination shall include the source code +%% for the parts of the runtime libraries of Erlang/OTP used as well as +%% that of the covered work. + +-module(enft_kpi). +-behaviour(gen_server). + +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2]). +-export([start_link/1, + enb_register/2, + enb_unregister/0, + fetch_counters/0, + shutdown/0]). + +-include_lib("kernel/include/logger.hrl"). + +-define(GTPU_PORT, 2152). + +-define(OP_EQ, "=="). +-define(OP_NEQ, "!="). + + +-type cfg() :: #{enable => boolean(), + table_name => string() + }. + + +-record(state, {cfg :: cfg(), + registry :: dict:dict() + }). + +-record(enb_state, {addr :: string(), + genb_id :: string(), + mon_ref :: reference(), + handle_ul :: integer(), + handle_dl :: integer() + }). + + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +-spec start_link(cfg()) -> gen_server:start_ret(). +start_link(Cfg) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Cfg], []). + + +-spec enb_register(Addr, GlobalENBId) -> ok | {error, term()} + when Addr :: string(), + GlobalENBId :: string(). +enb_register(Addr, GlobalENBId) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Addr, GlobalENBId}). + + +-spec enb_unregister() -> ok | {error, term()}. +enb_unregister() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + + +-spec fetch_counters() -> ok | {error, term()}. +fetch_counters() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + + +-spec shutdown() -> ok. +shutdown() -> + gen_server:stop(?MODULE). + + +%% ------------------------------------------------------------------ +%% gen_server API +%% ------------------------------------------------------------------ + +init([#{enable := true} = Cfg]) -> + TName = maps:get(table_name, Cfg, "osmo-s1gw"), + %% ignore (accept) anything but GTPU @ udp/2152 + R1 = [nft_expr_match_ip_proto("udp", ?OP_NEQ), nft_expr_accept()], + R2 = [nft_expr_match_udp_dport(?GTPU_PORT, ?OP_NEQ), nft_expr_accept()], + Cmds = [nft_cmd_add_table(TName), + nft_cmd_add_chain(TName, "gtpu-ul", "prerouting"), + nft_cmd_add_chain(TName, "gtpu-dl", "postrouting"), + nft_cmd_add_rule(TName, "gtpu-ul", R1), + nft_cmd_add_rule(TName, "gtpu-dl", R1), + nft_cmd_add_rule(TName, "gtpu-ul", R2), + nft_cmd_add_rule(TName, "gtpu-dl", R2) + ], + case nft_exec(Cmds) of + ok -> + ?LOG_INFO("NFT table ~p has been initialized", [TName]), + {ok, #state{cfg = Cfg#{table_name => TName}, + registry = dict:new()}}; + Error -> + ?LOG_ERROR("NFT table ~p init failed: ~p", [TName, Error]), + {error, Error} + end; + +init([#{enable := false} = Cfg]) -> + {ok, #state{cfg = Cfg, + registry = dict:new()}}. + + +handle_call(Info, From, + #{cfg := #{enable := false}} = S) -> + ?LOG_DEBUG("ignore ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), + {reply, ok, S}; + + +handle_call({enb_register, Addr, GlobalENBId}, {Pid, _Ref}, + #state{cfg = Cfg, registry = R0} = S) -> + case dict:find(Pid, R0) of + {ok, _} -> + ?LOG_DEBUG("eNB (pid ~p) is already registered", [Pid]), + {reply, ok, S}; + error -> + case enb_add_counters({Addr, GlobalENBId}, Cfg) of + {ok, ULH, DLH} -> + MonRef = erlang:monitor(process, Pid), + EnbState = #enb_state{addr = Addr, + genb_id = GlobalENBId, + mon_ref = MonRef, + handle_ul = ULH, + handle_dl = DLH}, + R1 = dict:store(Pid, EnbState, R0), + ?LOG_INFO("eNB (pid ~p, ~p) has been registered", [Pid, GlobalENBId]), + {reply, ok, S#state{registry = R1}}; + {error, Error} -> + ?LOG_ERROR("eNB (pid ~p) register failed: ~p", [Pid, Error]), + {reply, {error, Error}, S} + end + end; + +handle_call(enb_unregister, {Pid, _Ref}, + #state{cfg = Cfg, registry = R0} = S) -> + case dict:find(Pid, R0) of + {ok, EnbState} -> + erlang:demonitor(EnbState#enb_state.mon_ref, [flush]), + enb_del_counters(EnbState, Cfg), + R1 = dict:erase(Pid, R0), + ?LOG_INFO("eNB (pid ~p) has been unregistered", [Pid]), + {reply, ok, S#state{registry = R1}}; + error -> + ?LOG_ERROR("eNB (pid ~p) is not registered", [Pid]), + {reply, {error, enb_not_registered}, S} + end; + +handle_call(fetch_counters, _From, + #state{cfg = #{table_name := TName}} = S) -> + Cmds = [nft_cmd_list_counters(TName)], + case nft_exec(Cmds) of + {ok, Res} -> + parse_counters(Res), + {reply, ok, S}; + {error, Error} -> + ?LOG_ERROR("Failed to fetch counters: ~p", [Error]), + {reply, {error, Error}, S} + end; + +handle_call(Info, From, S) -> + ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), + {reply, {error, not_implemented}, S}. + + +handle_cast(Info, S) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, S}. + + +handle_info({'DOWN', _MonRef, process, Pid, Reason}, + #state{cfg = Cfg, registry = R0} = S) -> + ?LOG_INFO("eNB process ~p terminated with reason ~p", [Pid, Reason]), + case dict:find(Pid, R0) of + {ok, EnbState} -> + enb_del_counters(EnbState, Cfg), + R1 = dict:erase(Pid, R0), + ?LOG_INFO("eNB (pid ~p) has been unregistered", [Pid]), + {noreply, S#state{registry = R1}}; + error -> + ?LOG_ERROR("eNB (pid ~p) is not registered", [Pid]), + {noreply, S} + end; + +handle_info(Info, S) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, S}. + + +terminate(Reason, + #state{cfg = #{table_name := TName}}) -> + nft_exec([nft_cmd_del_table(TName)]), %% delete the table + ?LOG_NOTICE("Terminating, reason ~p", [Reason]), + ok. + + +%% ------------------------------------------------------------------ +%% private API +%% ------------------------------------------------------------------ + +enb_add_counters({Addr, GlobalENBId}, + #{table_name := TName}) -> + RUL = [nft_expr_match_ip_saddr(Addr, ?OP_EQ), + nft_expr_counter("ul-" ++ GlobalENBId)], + RDL = [nft_expr_match_ip_daddr(Addr, ?OP_EQ), + nft_expr_counter("dl-" ++ GlobalENBId)], + Cmds = [nft_cmd_add_counter(TName, "ul-" ++ GlobalENBId), + nft_cmd_add_counter(TName, "dl-" ++ GlobalENBId), + nft_cmd_add_rule(TName, "gtpu-ul", RUL), + nft_cmd_add_rule(TName, "gtpu-dl", RDL) + ], + case nft_exec(Cmds) of + ok -> + {ok, nft_chain_last_handle(TName, "gtpu-ul"), + nft_chain_last_handle(TName, "gtpu-dl")}; + {error, Error} -> + ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]), + {error, Error} + end. + + +enb_del_counters(#enb_state{genb_id = GlobalENBId, + handle_ul = ULH, + handle_dl = DLH}, + #{table_name := TName}) -> + Cmds = [nft_cmd_del_rule(TName, "gtpu-ul", ULH), + nft_cmd_del_rule(TName, "gtpu-dl", DLH), + nft_cmd_del_counter(TName, "ul-" ++ GlobalENBId), + nft_cmd_del_counter(TName, "dl-" ++ GlobalENBId) + ], + case nft_exec(Cmds) of + ok -> ok; + {error, Error} -> + ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]), + {error, Error} + end. + + +parse_counters([Item | Tail]) -> + case Item of + #{<< "counter" >> := C} -> + Name = binary_to_list(maps:get(<< "name" >>, C)), + Bytes = maps:get(<< "bytes" >>, C), + Packets = maps:get(<< "packets" >>, C), + ?LOG_INFO("Counter ~p: packets ~p, bytes ~p", + [Name, Packets, Bytes]); + _ -> ok %% ignore anything else + end, + parse_counters(Tail); + +parse_counters([]) -> + ok. + + +-spec nft_exec(Cmds) -> ok | {ok, Result} | {error, Error} + when Cmds :: [map()], + Result :: [map()], + Error :: term(). +nft_exec(Cmds) -> + ?LOG_DEBUG("Executing nftables commands: ~p", [Cmds]), + Req0 = json:encode(#{nftables => Cmds}), %% XXX: json API + Req1 = binary_to_list(iolist_to_binary(Req0)), + nft_result(enftables:nft_run_cmd(Req1)). + + +nft_result({ok, []}) -> ok; + +nft_result({ok, Res0}) when is_list(Res0) -> + try json:decode(list_to_binary(Res0)) of %% XXX: json API + #{<< "nftables" >> := Res1} -> {ok, Res1}; + _ -> {error, {json_decode, unexpected}} + catch + Exception:Reason:StackTrace -> + ?LOG_ERROR("An exception occurred: ~p, ~p, ~p", [Exception, Reason, StackTrace]), + {error, {json_decode, Reason}} + end; + +nft_result(Error) -> Error. + + +nft_chain_last_handle(TName, CName) -> + Cmds = [nft_cmd_list_chain(TName, CName)], + case nft_exec(Cmds) of + {ok, Res} -> + #{<< "rule" >> := Rule} = lists:last(Res), + maps:get(<< "handle" >>, Rule); + Error -> + ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]), + error + end. + + +nft_cmd_add_table(TName) -> + T = #{family => << "inet" >>, + name => list_to_binary(TName), + flags => [<< "owner" >>] + }, + #{add => #{table => T}}. + + +nft_cmd_del_table(TName) -> + T = #{family => << "inet" >>, + name => list_to_binary(TName) + }, + #{delete => #{table => T}}. + + +nft_cmd_add_chain(TName, CName, Hook) -> + C = #{family => << "inet" >>, + table => list_to_binary(TName), + name => list_to_binary(CName), + type => << "filter" >>, + hook => list_to_binary(Hook), + prio => 0, + policy => << "accept" >> + }, + #{add => #{chain => C}}. + + +nft_cmd_add_rule(TName, CName, Expr) -> + R = #{family => << "inet" >>, + table => list_to_binary(TName), + chain => list_to_binary(CName), + expr => Expr + }, + #{add => #{rule => R}}. + + +nft_cmd_del_rule(TName, CName, Handle) -> + R = #{family => << "inet" >>, + table => list_to_binary(TName), + chain => list_to_binary(CName), + handle => Handle + }, + #{delete => #{rule => R}}. + + +nft_counter(TName, Name) -> + #{family => << "inet" >>, + table => list_to_binary(TName), + name => list_to_binary(Name) + }. + +nft_cmd_add_counter(TName, Name) -> + #{add => #{counter => nft_counter(TName, Name)}}. + +nft_cmd_del_counter(TName, Name) -> + #{delete => #{counter => nft_counter(TName, Name)}}. + + +-spec nft_expr_match_payload({Proto, Field}, Value, Op) -> map() + when Proto :: string(), + Field :: string(), + Value :: term(), + Op :: string(). +nft_expr_match_payload({Proto, Field}, Value, Op) -> + Left = #{payload => #{protocol => list_to_binary(Proto), + field => list_to_binary(Field)}}, + #{match => #{left => Left, + right => Value, + op => list_to_binary(Op)}}. + + +nft_expr_match_ip_proto(Proto, Op) -> + nft_expr_match_payload({"ip", "protocol"}, + list_to_binary(Proto), Op). + +nft_expr_match_ip_saddr(Addr, Op) -> + nft_expr_match_payload({"ip", "saddr"}, + list_to_binary(Addr), Op). + +nft_expr_match_ip_daddr(Addr, Op) -> + nft_expr_match_payload({"ip", "daddr"}, + list_to_binary(Addr), Op). + +nft_expr_match_udp_dport(Port, Op) -> + nft_expr_match_payload({"udp", "dport"}, Port, Op). + + +nft_expr_accept() -> + #{accept => null}. + + +nft_expr_counter(Name) -> + #{counter => list_to_binary(Name)}. + + +nft_cmd_list_chain(TName, CName) -> + C = #{family => << "inet" >>, + table => list_to_binary(TName), + name => list_to_binary(CName) + }, + #{list => #{chain => C}}. + + +nft_cmd_list_counters(TName) -> + T = #{family => << "inet" >>, + name => list_to_binary(TName) + }, + #{list => #{counters => #{table => T}}}. + + +%% vim:set ts=4 sw=4 et: