fixeria has uploaded this change for review. ( https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/41281?usp=email )
Change subject: mme_registry: the MME registry (pool) implementation ......................................................................
mme_registry: the MME registry (pool) implementation
Change-Id: Id5480222439bf93eca2e994b291c619dff823b01 Related: SYS#7052 --- A src/mme_registry.erl M src/osmo_s1gw_sup.erl A test/mme_registry_test.erl 3 files changed, 424 insertions(+), 1 deletion(-)
git pull ssh://gerrit.osmocom.org:29418/erlang/osmo-s1gw refs/changes/81/41281/1
diff --git a/src/mme_registry.erl b/src/mme_registry.erl new file mode 100644 index 0000000..7867bec --- /dev/null +++ b/src/mme_registry.erl @@ -0,0 +1,280 @@ +%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH info@sysmocom.de +%% Author: Vadim Yanitskiy vyanitskiy@sysmocom.de +%% +%% All Rights Reserved +%% +%% SPDX-License-Identifier: AGPL-3.0-or-later +%% +%% This program is free software; you can redistribute it and/or modify +%% it under the terms of the GNU Affero General Public License as +%% published by the Free Software Foundation; either version 3 of the +%% License, or (at your option) any later version. +%% +%% This program is distributed in the hope that it will be useful, +%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%% GNU General Public License for more details. +%% +%% You should have received a copy of the GNU Affero General Public License +%% along with this program. If not, see https://www.gnu.org/licenses/. +%% +%% Additional Permission under GNU AGPL version 3 section 7: +%% +%% If you modify this Program, or any covered work, by linking or +%% combining it with runtime libraries of Erlang/OTP as released by +%% Ericsson on https://www.erlang.org (or a modified version of these +%% libraries), containing parts covered by the terms of the Erlang Public +%% License (https://www.erlang.org/EPLICENSE), the licensors of this +%% Program grant you additional permission to convey the resulting work +%% without the need to license the runtime libraries of Erlang/OTP under +%% the GNU Affero General Public License. Corresponding Source for a +%% non-source form of such a combination shall include the source code +%% for the parts of the runtime libraries of Erlang/OTP used as well as +%% that of the covered work. + +-module(mme_registry). +-behaviour(gen_server). + +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2]). +-export([start_link/0, + mme_register/1, + mme_unregister/1, + mme_select/0, + mme_select/1, + fetch_mme_info/1, + fetch_mme_list/0, + shutdown/0]). + +-include_lib("kernel/include/logger.hrl"). + +-include("s1gw_metrics.hrl"). +-include("s1ap.hrl"). + + +-type mme_name() :: string(). + +-type mme_info() :: #{name := mme_name(), %% unique identifier of this MME + laddr => string() | inet:ip_address(), + raddr := string() | inet:ip_address(), + rport => inet:port_number() + }. + +-type mme_list() :: [mme_info()]. + +-export_type([mme_name/0, + mme_info/0]). + + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +-spec start_link() -> {ok, pid()} | term(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +-spec mme_register(mme_info()) -> ok | {error, term()}. +mme_register(MmeInfo) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, MmeInfo}). + + +-spec mme_unregister(mme_name()) -> ok | {error, term()}. +mme_unregister(MmeName) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, MmeName}). + + +-spec mme_select() -> {ok, mme_info()} | error. +mme_select() -> + gen_server:call(?MODULE, {?FUNCTION_NAME, undefined}). + + +-spec mme_select(mme_name()) -> {ok, mme_info()} | error. +mme_select(OldMmeName) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, OldMmeName}). + + +-spec fetch_mme_info(mme_name()) -> {ok, mme_info()} | error. +fetch_mme_info(MmeName) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, MmeName}). + + +-spec fetch_mme_list() -> mme_list(). +fetch_mme_list() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + + +-spec shutdown() -> ok. +shutdown() -> + gen_server:stop(?MODULE). + + +%% ------------------------------------------------------------------ +%% gen_server API +%% ------------------------------------------------------------------ + +init([]) -> + %% parse MMEs from the environment + MMEs = mme_list_from_env(), + {ok, MMEs}. + + +handle_call({mme_register, MmeInfo}, _From, MMEs0) -> + case mme_add(MmeInfo, MMEs0) of + {ok, MMEs1} -> + {reply, ok, MMEs1}; + {error, Error} -> + {reply, {error, Error}, MMEs0} + end; + +handle_call({mme_unregister, MmeName}, _From, MMEs0) -> + case mme_del(MmeName, MMEs0) of + {ok, MMEs1} -> + %% TODO: kill all eNB connections? + {reply, ok, MMEs1}; + {error, Error} -> + {reply, {error, Error}, MMEs0} + end; + +handle_call({mme_select, OldMmeName}, _From, MMEs) -> + Reply = mme_select(OldMmeName, MMEs), + {reply, Reply, MMEs}; + +handle_call({fetch_mme_info, MmeName}, _From, MMEs) -> + Reply = mme_find(MmeName, MMEs), + {reply, Reply, MMEs}; + +handle_call(fetch_mme_list, _From, MMEs) -> + {reply, MMEs, MMEs}; + +handle_call(Info, From, MMEs) -> + ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), + {reply, {error, not_implemented}, MMEs}. + + +handle_cast(Info, MMEs) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, MMEs}. + + +handle_info(Info, MMEs) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, MMEs}. + + +terminate(Reason, _MMEs) -> + ?LOG_NOTICE("Terminating, reason ~p", [Reason]), + ok. + + +%% ------------------------------------------------------------------ +%% private API +%% ------------------------------------------------------------------ + +-spec mme_match(mme_info(), map()) -> true | false. +mme_match(#{name := Name}, + #{name := Name}) -> true; + +mme_match(#{raddr := Addr, rport := Port}, + #{raddr := Addr, rport := Port}) -> true; + +mme_match(_, _) -> false. + + +%% Add a new MME if it does not already exist +-spec mme_add(mme_info(), mme_list()) -> {ok, mme_list()} | {error, term()}. +mme_add(MmeInfo0, MMEs) -> + %% assign defaults, parse local/remote addresses + MmeName = maps:get(name, MmeInfo0), + LAddr = sctp_common:parse_addr(maps:get(laddr, MmeInfo0, "::")), + RAddr = sctp_common:parse_addr(maps:get(raddr, MmeInfo0)), + RPort = maps:get(rport, MmeInfo0, ?S1AP_PORT), + MmeInfo1 = MmeInfo0#{laddr => LAddr, + raddr => RAddr, + rport => RPort}, + %% check for duplicates + case lists:any(fun(E) -> mme_match(E, MmeInfo1) end, MMEs) of + true -> + ?LOG_ERROR("MME (name=~p / ~p:~p) is *already* registered", + [MmeName, RAddr, RPort]), + {error, already_registered}; + false -> + ?LOG_INFO("MME (name=~p, ~p:~p) registered", + [MmeName, RAddr, RPort]), + {ok, MMEs ++ [MmeInfo1]} + end. + + +%% Remove an MME by name +-spec mme_del(mme_name(), mme_list()) -> {ok, mme_list()} | {error, term()}. +mme_del(MmeName, MMEs0) -> + Fun = fun(E) -> not mme_match(E, #{name => MmeName}) end, + case lists:filter(Fun, MMEs0) of + MMEs0 -> %% unchanged list means nothing was filtered out + ?LOG_ERROR("MME (name=~p) is *not* registered", [MmeName]), + {error, not_registered}; + MMEs1 -> + ?LOG_INFO("MME (name=~p) unregistered", [MmeName]), + {ok, MMEs1} + end. + + +%% Select an MME from the pool +-spec mme_select(OldMmeName, MMEs) -> Result + when OldMmeName :: undefined | mme_name(), + MMEs :: mme_list(), + Result :: {ok, mme_info()} | error. +mme_select(_OldMmeName, []) -> + %% empty MME pool + error; + +mme_select(_OldMmeName, [MmeInfo]) -> + %% only one MME in the pool + {ok, MmeInfo}; + +mme_select(undefined, MMEs) -> + %% old MME unknown => fall back to the first entry + {ok, hd(MMEs)}; + +mme_select(OldMmeName, MMEs) -> + Fun = fun(E) -> maps:get(name, E) =/= OldMmeName end, + case lists:dropwhile(Fun, MMEs) of + [] -> + %% old MME is not registered, fall back to the first entry + {ok, hd(MMEs)}; + [_ | []] -> + %% old MME was the last entry, wrap around + {ok, hd(MMEs)}; + [_ | [MmeInfo | _]] -> + %% return MME following the old one + {ok, MmeInfo} + end. + + +%% Find an MME by name +-spec mme_find(mme_name(), mme_list()) -> {ok, mme_info()} | error. +mme_find(MmeName, MMEs) -> + Fun = fun(E) -> mme_match(E, #{name => MmeName}) end, + case lists:filter(Fun, MMEs) of + [MmeInfo] -> {ok, MmeInfo}; + [] -> error + end. + + +-spec mme_list_from_env() -> mme_list(). +mme_list_from_env() -> + MMEs = osmo_s1gw:get_env(mme_pool, []), + lists:foldl(fun mme_add_from_env/2, [], MMEs). + + +-spec mme_add_from_env(mme_info(), mme_list()) -> mme_list(). +mme_add_from_env(MmeInfo, MMEs0) -> + {ok, MMEs1} = mme_add(MmeInfo, MMEs0), + MMEs1. + + +%% vim:set ts=4 sw=4 et: diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl index 1616fe5..86ecbda 100644 --- a/src/osmo_s1gw_sup.erl +++ b/src/osmo_s1gw_sup.erl @@ -73,6 +73,11 @@ 5000, worker, [enb_registry]}, + MmeRegistry = {mme_registry, {mme_registry, start_link, []}, + permanent, + 5000, + worker, + [mme_registry]}, SctpServer = {sctp_server, {sctp_server, start_link, [server_cfg()]}, permanent, 5000, @@ -96,7 +101,13 @@ [erf]},
s1gw_metrics:init(), - {ok, {{one_for_one, 5, 10}, [EnbRegistry, SctpServer, PfcpPeer, GtpuKpi, RestServer]}}. + {ok, {{one_for_one, 5, 10}, + [EnbRegistry, + MmeRegistry, + SctpServer, + PfcpPeer, + GtpuKpi, + RestServer]}}.
%% ------------------------------------------------------------------ diff --git a/test/mme_registry_test.erl b/test/mme_registry_test.erl new file mode 100644 index 0000000..a327233 --- /dev/null +++ b/test/mme_registry_test.erl @@ -0,0 +1,132 @@ +-module(mme_registry_test). + +-include_lib("eunit/include/eunit.hrl"). + + +%% ------------------------------------------------------------------ +%% setup functions +%% ------------------------------------------------------------------ + +-define(TC(Fun), {setup, + fun start/0, + fun stop/1, + Fun}). + + +start() -> + MMEs = [#{name => "mme0", raddr => "127.0.0.10"}, + #{name => "mme1", raddr => "127.0.0.11"}], + osmo_s1gw:set_env(mme_pool, MMEs), + {ok, _} = mme_registry:start_link(). + + +stop(_) -> + mme_registry:shutdown(). + + +%% ------------------------------------------------------------------ +%% testcase descriptions +%% ------------------------------------------------------------------ + +mme_registry_test_() -> + [{"Test adding and deleting MMEs", + ?TC(fun test_add_del/1)}, + {"Test fetching MME info", + ?TC(fun test_fetch/1)}, + {"Test MME selection", + ?TC(fun test_select/1)}, + {"Test MME selection (empty pool)", + ?TC(fun test_select_empty/1)}]. + + +%% ------------------------------------------------------------------ +%% actual testcases +%% ------------------------------------------------------------------ + +test_add_del(_) -> + [%% "mme0" and "mme1" come pre-registered, register "mme2" + ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2", + raddr => "127.0.0.12"})), + %% duplicate name + ?_assertEqual({error, already_registered}, + mme_registry:mme_register(#{name => "mme2", + raddr => "127.0.0.13"})), + %% duplicate raddr:rport + ?_assertEqual({error, already_registered}, + mme_registry:mme_register(#{name => "mme3", + raddr => "127.0.0.12"})), + %% deleting previously registered MMEs + ?_assertEqual(ok, mme_registry:mme_unregister("mme0")), + ?_assertEqual(ok, mme_registry:mme_unregister("mme1")), + ?_assertEqual(ok, mme_registry:mme_unregister("mme2")), + %% deleting non-existent MMEs + ?_assertEqual({error, not_registered}, + mme_registry:mme_unregister("mme0")), + ?_assertEqual({error, not_registered}, + mme_registry:mme_unregister("mme1")), + ?_assertEqual({error, not_registered}, + mme_registry:mme_unregister("mme42"))]. + + +test_fetch(_) -> + [%% "mme0" and "mme1" come pre-registered, register "mme2" + ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2", + laddr => "192.168.1.100", + raddr => "192.168.1.101", + rport => 1337})), + %% test fetching a list of MMEs + ?_assertMatch([#{name := "mme0", raddr := {127,0,0,10}}, + #{name := "mme1", raddr := {127,0,0,11}}, + #{name := "mme2", raddr := {192,168,1,101}}], + mme_registry:fetch_mme_list()), + %% test fetching MMEs by name + ?_assertMatch({ok, #{name := "mme0", + laddr := {0,0,0,0,0,0,0,0}, %% "::" + raddr := {127,0,0,10}, + rport := 36412}}, + mme_registry:fetch_mme_info("mme0")), + ?_assertMatch({ok, #{name := "mme1", + laddr := {0,0,0,0,0,0,0,0}, %% "::" + raddr := {127,0,0,11}, + rport := 36412}}, + mme_registry:fetch_mme_info("mme1")), + ?_assertMatch({ok, #{name := "mme2", + laddr := {192,168,1,100}, + raddr := {192,168,1,101}, + rport := 1337}}, + mme_registry:fetch_mme_info("mme2")), + %% "mme3" is not registered + ?_assertEqual(error, + mme_registry:fetch_mme_info("mme3"))]. + + +test_select(_) -> + [%% "mme0" and "mme1" come pre-registered, register "mme2" + ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2", + raddr => "127.0.0.12"})), + %% old MME unknown, fall back to the first entry ("mme0") + ?_assertMatch({ok, #{name := "mme0"}}, + mme_registry:mme_select()), + ?_assertMatch({ok, #{name := "mme0"}}, + mme_registry:mme_select("mme3")), + %% old MME known, expect the next MME to be selected + ?_assertMatch({ok, #{name := "mme1"}}, + mme_registry:mme_select("mme0")), + ?_assertMatch({ok, #{name := "mme2"}}, + mme_registry:mme_select("mme1")), + ?_assertMatch({ok, #{name := "mme0"}}, + mme_registry:mme_select("mme2"))]. + + +test_select_empty(_) -> + [%% "mme0" and "mme1" come pre-registered, unregister them + ?_assertEqual(ok, mme_registry:mme_unregister("mme0")), + ?_assertEqual(ok, mme_registry:mme_unregister("mme1")), + %% the MME pool is now empty, expect an error + ?_assertEqual(error, mme_registry:mme_select()), + ?_assertEqual(error, mme_registry:mme_select("mme0")), + ?_assertEqual(error, mme_registry:mme_select("mme1")), + ?_assertEqual(error, mme_registry:mme_select("mme2"))]. + + +%% vim:set ts=4 sw=4 et: