fixeria has uploaded this change for review.

View Change

[WIP] enft_kpi: retrieve per-eNB traffic counters

Change-Id: I498d2854447a2d53d2abddd38652f3e2bbb1fbdd
Related: SYS#7307
---
M rebar.config
M rebar.lock
A src/enft_kpi.erl
3 files changed, 441 insertions(+), 0 deletions(-)

git pull ssh://gerrit.osmocom.org:29418/erlang/osmo-s1gw refs/changes/81/40281/1
diff --git a/rebar.config b/rebar.config
index 0100f17..31dd511 100644
--- a/rebar.config
+++ b/rebar.config
@@ -9,6 +9,8 @@
{git, "https://github.com/rlipscombe/logger_color_formatter.git", {tag, "0.5.0"}}},
{pfcplib,
{git, "https://github.com/travelping/pfcplib.git", {branch, "master"}}},
+ {enftables,
+ {git, "https://gitea.osmocom.org/vyanitskiy/enftables.git", {branch, "master"}}},
{exometer_core,
{git, "https://github.com/Feuerlabs/exometer_core.git", {branch, "master"}}},
{exometer_report_statsd,
diff --git a/rebar.lock b/rebar.lock
index c52d677..0d8cc0d 100644
--- a/rebar.lock
+++ b/rebar.lock
@@ -1,5 +1,9 @@
{"1.2.0",
[{<<"cut">>,{pkg,<<"cut">>,<<"1.0.3">>},1},
+ {<<"enftables">>,
+ {git,"https://gitea.osmocom.org/vyanitskiy/enftables.git",
+ {ref,"44cbec3cdc792781ee35eda3f1b586b41519c5bd"}},
+ 0},
{<<"exometer_core">>,
{git,"https://github.com/Feuerlabs/exometer_core.git",
{ref,"f9c7abc095edc893c9354a3d5f061715de1d9e79"}},
diff --git a/src/enft_kpi.erl b/src/enft_kpi.erl
new file mode 100644
index 0000000..8d6dcc4
--- /dev/null
+++ b/src/enft_kpi.erl
@@ -0,0 +1,435 @@
+%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+%% Author: Vadim Yanitskiy <vyanitskiy@sysmocom.de>
+%%
+%% All Rights Reserved
+%%
+%% SPDX-License-Identifier: AGPL-3.0-or-later
+%%
+%% This program is free software; you can redistribute it and/or modify
+%% it under the terms of the GNU Affero General Public License as
+%% published by the Free Software Foundation; either version 3 of the
+%% License, or (at your option) any later version.
+%%
+%% This program is distributed in the hope that it will be useful,
+%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%% GNU General Public License for more details.
+%%
+%% You should have received a copy of the GNU Affero General Public License
+%% along with this program. If not, see <https://www.gnu.org/licenses/>.
+%%
+%% Additional Permission under GNU AGPL version 3 section 7:
+%%
+%% If you modify this Program, or any covered work, by linking or
+%% combining it with runtime libraries of Erlang/OTP as released by
+%% Ericsson on https://www.erlang.org (or a modified version of these
+%% libraries), containing parts covered by the terms of the Erlang Public
+%% License (https://www.erlang.org/EPLICENSE), the licensors of this
+%% Program grant you additional permission to convey the resulting work
+%% without the need to license the runtime libraries of Erlang/OTP under
+%% the GNU Affero General Public License. Corresponding Source for a
+%% non-source form of such a combination shall include the source code
+%% for the parts of the runtime libraries of Erlang/OTP used as well as
+%% that of the covered work.
+
+-module(enft_kpi).
+-behaviour(gen_server).
+
+-export([init/1,
+ handle_info/2,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2]).
+-export([start_link/1,
+ enb_register/2,
+ enb_unregister/0,
+ fetch_counters/0,
+ shutdown/0]).
+
+-include_lib("kernel/include/logger.hrl").
+
+-define(GTPU_PORT, 2152).
+
+-define(OP_EQ, "==").
+-define(OP_NEQ, "!=").
+
+
+-type cfg() :: #{enable => boolean(),
+ table_name => string()
+ }.
+
+
+-record(state, {cfg :: cfg(),
+ registry :: dict:dict()
+ }).
+
+-record(enb_state, {addr :: string(),
+ genb_id :: string(),
+ mon_ref :: reference(),
+ handle_ul :: integer(),
+ handle_dl :: integer()
+ }).
+
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+-spec start_link(cfg()) -> gen_server:start_ret().
+start_link(Cfg) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [Cfg], []).
+
+
+-spec enb_register(Addr, GlobalENBId) -> ok | {error, term()}
+ when Addr :: string(),
+ GlobalENBId :: string().
+enb_register(Addr, GlobalENBId) ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, Addr, GlobalENBId}).
+
+
+-spec enb_unregister() -> ok | {error, term()}.
+enb_unregister() ->
+ gen_server:call(?MODULE, ?FUNCTION_NAME).
+
+
+-spec fetch_counters() -> ok | {error, term()}.
+fetch_counters() ->
+ gen_server:call(?MODULE, ?FUNCTION_NAME).
+
+
+-spec shutdown() -> ok.
+shutdown() ->
+ gen_server:stop(?MODULE).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([#{enable := true} = Cfg]) ->
+ TName = maps:get(table_name, Cfg, "osmo-s1gw"),
+ %% ignore (accept) anything but GTPU @ udp/2152
+ R1 = [nft_expr_match_ip_proto("udp", ?OP_NEQ), nft_expr_accept()],
+ R2 = [nft_expr_match_udp_dport(?GTPU_PORT, ?OP_NEQ), nft_expr_accept()],
+ Cmds = [nft_cmd_add_table(TName),
+ nft_cmd_add_chain(TName, "gtpu-ul", "prerouting"),
+ nft_cmd_add_chain(TName, "gtpu-dl", "postrouting"),
+ nft_cmd_add_rule(TName, "gtpu-ul", R1),
+ nft_cmd_add_rule(TName, "gtpu-dl", R1),
+ nft_cmd_add_rule(TName, "gtpu-ul", R2),
+ nft_cmd_add_rule(TName, "gtpu-dl", R2)
+ ],
+ case nft_exec(Cmds) of
+ ok ->
+ ?LOG_INFO("NFT table ~p has been initialized", [TName]),
+ {ok, #state{cfg = Cfg#{table_name => TName},
+ registry = dict:new()}};
+ Error ->
+ ?LOG_ERROR("NFT table ~p init failed: ~p", [TName, Error]),
+ {error, Error}
+ end;
+
+init([#{enable := false} = Cfg]) ->
+ {ok, #state{cfg = Cfg,
+ registry = dict:new()}}.
+
+
+handle_call(Info, From,
+ #{cfg := #{enable := false}} = S) ->
+ ?LOG_DEBUG("ignore ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]),
+ {reply, ok, S};
+
+
+handle_call({enb_register, Addr, GlobalENBId}, {Pid, _Ref},
+ #state{cfg = Cfg, registry = R0} = S) ->
+ case dict:find(Pid, R0) of
+ {ok, _} ->
+ ?LOG_DEBUG("eNB (pid ~p) is already registered", [Pid]),
+ {reply, ok, S};
+ error ->
+ case enb_add_counters({Addr, GlobalENBId}, Cfg) of
+ {ok, ULH, DLH} ->
+ MonRef = erlang:monitor(process, Pid),
+ EnbState = #enb_state{addr = Addr,
+ genb_id = GlobalENBId,
+ mon_ref = MonRef,
+ handle_ul = ULH,
+ handle_dl = DLH},
+ R1 = dict:store(Pid, EnbState, R0),
+ ?LOG_INFO("eNB (pid ~p, ~p) has been registered", [Pid, GlobalENBId]),
+ {reply, ok, S#state{registry = R1}};
+ {error, Error} ->
+ ?LOG_ERROR("eNB (pid ~p) register failed: ~p", [Pid, Error]),
+ {reply, {error, Error}, S}
+ end
+ end;
+
+handle_call(enb_unregister, {Pid, _Ref},
+ #state{cfg = Cfg, registry = R0} = S) ->
+ case dict:find(Pid, R0) of
+ {ok, EnbState} ->
+ erlang:demonitor(EnbState#enb_state.mon_ref, [flush]),
+ enb_del_counters(EnbState, Cfg),
+ R1 = dict:erase(Pid, R0),
+ ?LOG_INFO("eNB (pid ~p) has been unregistered", [Pid]),
+ {reply, ok, S#state{registry = R1}};
+ error ->
+ ?LOG_ERROR("eNB (pid ~p) is not registered", [Pid]),
+ {reply, {error, enb_not_registered}, S}
+ end;
+
+handle_call(fetch_counters, _From,
+ #state{cfg = #{table_name := TName}} = S) ->
+ Cmds = [nft_cmd_list_counters(TName)],
+ case nft_exec(Cmds) of
+ {ok, Res} ->
+ parse_counters(Res),
+ {reply, ok, S};
+ {error, Error} ->
+ ?LOG_ERROR("Failed to fetch counters: ~p", [Error]),
+ {reply, {error, Error}, S}
+ end;
+
+handle_call(Info, From, S) ->
+ ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]),
+ {reply, {error, not_implemented}, S}.
+
+
+handle_cast(Info, S) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, S}.
+
+
+handle_info({'DOWN', _MonRef, process, Pid, Reason},
+ #state{cfg = Cfg, registry = R0} = S) ->
+ ?LOG_INFO("eNB process ~p terminated with reason ~p", [Pid, Reason]),
+ case dict:find(Pid, R0) of
+ {ok, EnbState} ->
+ enb_del_counters(EnbState, Cfg),
+ R1 = dict:erase(Pid, R0),
+ ?LOG_INFO("eNB (pid ~p) has been unregistered", [Pid]),
+ {noreply, S#state{registry = R1}};
+ error ->
+ ?LOG_ERROR("eNB (pid ~p) is not registered", [Pid]),
+ {noreply, S}
+ end;
+
+handle_info(Info, S) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, S}.
+
+
+terminate(Reason,
+ #state{cfg = #{table_name := TName}}) ->
+ nft_exec([nft_cmd_del_table(TName)]), %% delete the table
+ ?LOG_NOTICE("Terminating, reason ~p", [Reason]),
+ ok.
+
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+enb_add_counters({Addr, GlobalENBId},
+ #{table_name := TName}) ->
+ RUL = [nft_expr_match_ip_saddr(Addr, ?OP_EQ),
+ nft_expr_counter("ul-" ++ GlobalENBId)],
+ RDL = [nft_expr_match_ip_daddr(Addr, ?OP_EQ),
+ nft_expr_counter("dl-" ++ GlobalENBId)],
+ Cmds = [nft_cmd_add_counter(TName, "ul-" ++ GlobalENBId),
+ nft_cmd_add_counter(TName, "dl-" ++ GlobalENBId),
+ nft_cmd_add_rule(TName, "gtpu-ul", RUL),
+ nft_cmd_add_rule(TName, "gtpu-dl", RDL)
+ ],
+ case nft_exec(Cmds) of
+ ok ->
+ {ok, nft_chain_last_handle(TName, "gtpu-ul"),
+ nft_chain_last_handle(TName, "gtpu-dl")};
+ {error, Error} ->
+ ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]),
+ {error, Error}
+ end.
+
+
+enb_del_counters(#enb_state{genb_id = GlobalENBId,
+ handle_ul = ULH,
+ handle_dl = DLH},
+ #{table_name := TName}) ->
+ Cmds = [nft_cmd_del_rule(TName, "gtpu-ul", ULH),
+ nft_cmd_del_rule(TName, "gtpu-dl", DLH),
+ nft_cmd_del_counter(TName, "ul-" ++ GlobalENBId),
+ nft_cmd_del_counter(TName, "dl-" ++ GlobalENBId)
+ ],
+ case nft_exec(Cmds) of
+ ok -> ok;
+ {error, Error} ->
+ ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]),
+ {error, Error}
+ end.
+
+
+parse_counters([Item | Tail]) ->
+ case Item of
+ #{<< "counter" >> := C} ->
+ Name = binary_to_list(maps:get(<< "name" >>, C)),
+ Bytes = maps:get(<< "bytes" >>, C),
+ Packets = maps:get(<< "packets" >>, C),
+ ?LOG_INFO("Counter ~p: packets ~p, bytes ~p",
+ [Name, Packets, Bytes]);
+ _ -> ok %% ignore anything else
+ end,
+ parse_counters(Tail);
+
+parse_counters([]) ->
+ ok.
+
+
+-spec nft_exec(Cmds) -> ok | {ok, Result} | {error, Error}
+ when Cmds :: [map()],
+ Result :: [map()],
+ Error :: term().
+nft_exec(Cmds) ->
+ ?LOG_DEBUG("Executing nftables commands: ~p", [Cmds]),
+ Req0 = json:encode(#{nftables => Cmds}), %% XXX: json API
+ Req1 = binary_to_list(iolist_to_binary(Req0)),
+ nft_result(enftables:nft_run_cmd(Req1)).
+
+
+nft_result({ok, []}) -> ok;
+
+nft_result({ok, Res0}) when is_list(Res0) ->
+ try json:decode(list_to_binary(Res0)) of %% XXX: json API
+ #{<< "nftables" >> := Res1} -> {ok, Res1};
+ _ -> {error, {json_decode, unexpected}}
+ catch
+ Exception:Reason:StackTrace ->
+ ?LOG_ERROR("An exception occurred: ~p, ~p, ~p", [Exception, Reason, StackTrace]),
+ {error, {json_decode, Reason}}
+ end;
+
+nft_result(Error) -> Error.
+
+
+nft_chain_last_handle(TName, CName) ->
+ Cmds = [nft_cmd_list_chain(TName, CName)],
+ case nft_exec(Cmds) of
+ {ok, Res} ->
+ #{<< "rule" >> := Rule} = lists:last(Res),
+ maps:get(<< "handle" >>, Rule);
+ Error ->
+ ?LOG_ERROR("~p() failed: ~p", [?FUNCTION_NAME, Error]),
+ error
+ end.
+
+
+nft_cmd_add_table(TName) ->
+ T = #{family => << "inet" >>,
+ name => list_to_binary(TName),
+ flags => [<< "owner" >>]
+ },
+ #{add => #{table => T}}.
+
+
+nft_cmd_del_table(TName) ->
+ T = #{family => << "inet" >>,
+ name => list_to_binary(TName)
+ },
+ #{delete => #{table => T}}.
+
+
+nft_cmd_add_chain(TName, CName, Hook) ->
+ C = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ name => list_to_binary(CName),
+ type => << "filter" >>,
+ hook => list_to_binary(Hook),
+ prio => 0,
+ policy => << "accept" >>
+ },
+ #{add => #{chain => C}}.
+
+
+nft_cmd_add_rule(TName, CName, Expr) ->
+ R = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ chain => list_to_binary(CName),
+ expr => Expr
+ },
+ #{add => #{rule => R}}.
+
+
+nft_cmd_del_rule(TName, CName, Handle) ->
+ R = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ chain => list_to_binary(CName),
+ handle => Handle
+ },
+ #{delete => #{rule => R}}.
+
+
+nft_counter(TName, Name) ->
+ #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ name => list_to_binary(Name)
+ }.
+
+nft_cmd_add_counter(TName, Name) ->
+ #{add => #{counter => nft_counter(TName, Name)}}.
+
+nft_cmd_del_counter(TName, Name) ->
+ #{delete => #{counter => nft_counter(TName, Name)}}.
+
+
+-spec nft_expr_match_payload({Proto, Field}, Value, Op) -> map()
+ when Proto :: string(),
+ Field :: string(),
+ Value :: term(),
+ Op :: string().
+nft_expr_match_payload({Proto, Field}, Value, Op) ->
+ Left = #{payload => #{protocol => list_to_binary(Proto),
+ field => list_to_binary(Field)}},
+ #{match => #{left => Left,
+ right => Value,
+ op => list_to_binary(Op)}}.
+
+
+nft_expr_match_ip_proto(Proto, Op) ->
+ nft_expr_match_payload({"ip", "protocol"},
+ list_to_binary(Proto), Op).
+
+nft_expr_match_ip_saddr(Addr, Op) ->
+ nft_expr_match_payload({"ip", "saddr"},
+ list_to_binary(Addr), Op).
+
+nft_expr_match_ip_daddr(Addr, Op) ->
+ nft_expr_match_payload({"ip", "daddr"},
+ list_to_binary(Addr), Op).
+
+nft_expr_match_udp_dport(Port, Op) ->
+ nft_expr_match_payload({"udp", "dport"}, Port, Op).
+
+
+nft_expr_accept() ->
+ #{accept => null}.
+
+
+nft_expr_counter(Name) ->
+ #{counter => list_to_binary(Name)}.
+
+
+nft_cmd_list_chain(TName, CName) ->
+ C = #{family => << "inet" >>,
+ table => list_to_binary(TName),
+ name => list_to_binary(CName)
+ },
+ #{list => #{chain => C}}.
+
+
+nft_cmd_list_counters(TName) ->
+ T = #{family => << "inet" >>,
+ name => list_to_binary(TName)
+ },
+ #{list => #{counters => #{table => T}}}.
+
+
+%% vim:set ts=4 sw=4 et:

To view, visit change 40281. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: newchange
Gerrit-Project: erlang/osmo-s1gw
Gerrit-Branch: master
Gerrit-Change-Id: I498d2854447a2d53d2abddd38652f3e2bbb1fbdd
Gerrit-Change-Number: 40281
Gerrit-PatchSet: 1
Gerrit-Owner: fixeria <vyanitskiy@sysmocom.de>