fixeria has uploaded this change for review.
mme_registry: the MME registry (pool) implementation
Change-Id: Id5480222439bf93eca2e994b291c619dff823b01
Related: SYS#7052
---
A src/mme_registry.erl
M src/osmo_s1gw_sup.erl
A test/mme_registry_test.erl
3 files changed, 424 insertions(+), 1 deletion(-)
git pull ssh://gerrit.osmocom.org:29418/erlang/osmo-s1gw refs/changes/81/41281/1
diff --git a/src/mme_registry.erl b/src/mme_registry.erl
new file mode 100644
index 0000000..7867bec
--- /dev/null
+++ b/src/mme_registry.erl
@@ -0,0 +1,280 @@
+%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+%% Author: Vadim Yanitskiy <vyanitskiy@sysmocom.de>
+%%
+%% All Rights Reserved
+%%
+%% SPDX-License-Identifier: AGPL-3.0-or-later
+%%
+%% This program is free software; you can redistribute it and/or modify
+%% it under the terms of the GNU Affero General Public License as
+%% published by the Free Software Foundation; either version 3 of the
+%% License, or (at your option) any later version.
+%%
+%% This program is distributed in the hope that it will be useful,
+%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%% GNU General Public License for more details.
+%%
+%% You should have received a copy of the GNU Affero General Public License
+%% along with this program. If not, see <https://www.gnu.org/licenses/>.
+%%
+%% Additional Permission under GNU AGPL version 3 section 7:
+%%
+%% If you modify this Program, or any covered work, by linking or
+%% combining it with runtime libraries of Erlang/OTP as released by
+%% Ericsson on https://www.erlang.org (or a modified version of these
+%% libraries), containing parts covered by the terms of the Erlang Public
+%% License (https://www.erlang.org/EPLICENSE), the licensors of this
+%% Program grant you additional permission to convey the resulting work
+%% without the need to license the runtime libraries of Erlang/OTP under
+%% the GNU Affero General Public License. Corresponding Source for a
+%% non-source form of such a combination shall include the source code
+%% for the parts of the runtime libraries of Erlang/OTP used as well as
+%% that of the covered work.
+
+-module(mme_registry).
+-behaviour(gen_server).
+
+-export([init/1,
+ handle_info/2,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2]).
+-export([start_link/0,
+ mme_register/1,
+ mme_unregister/1,
+ mme_select/0,
+ mme_select/1,
+ fetch_mme_info/1,
+ fetch_mme_list/0,
+ shutdown/0]).
+
+-include_lib("kernel/include/logger.hrl").
+
+-include("s1gw_metrics.hrl").
+-include("s1ap.hrl").
+
+
+-type mme_name() :: string().
+
+-type mme_info() :: #{name := mme_name(), %% unique identifier of this MME
+ laddr => string() | inet:ip_address(),
+ raddr := string() | inet:ip_address(),
+ rport => inet:port_number()
+ }.
+
+-type mme_list() :: [mme_info()].
+
+-export_type([mme_name/0,
+ mme_info/0]).
+
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+-spec start_link() -> {ok, pid()} | term().
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec mme_register(mme_info()) -> ok | {error, term()}.
+mme_register(MmeInfo) ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, MmeInfo}).
+
+
+-spec mme_unregister(mme_name()) -> ok | {error, term()}.
+mme_unregister(MmeName) ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, MmeName}).
+
+
+-spec mme_select() -> {ok, mme_info()} | error.
+mme_select() ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, undefined}).
+
+
+-spec mme_select(mme_name()) -> {ok, mme_info()} | error.
+mme_select(OldMmeName) ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, OldMmeName}).
+
+
+-spec fetch_mme_info(mme_name()) -> {ok, mme_info()} | error.
+fetch_mme_info(MmeName) ->
+ gen_server:call(?MODULE, {?FUNCTION_NAME, MmeName}).
+
+
+-spec fetch_mme_list() -> mme_list().
+fetch_mme_list() ->
+ gen_server:call(?MODULE, ?FUNCTION_NAME).
+
+
+-spec shutdown() -> ok.
+shutdown() ->
+ gen_server:stop(?MODULE).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([]) ->
+ %% parse MMEs from the environment
+ MMEs = mme_list_from_env(),
+ {ok, MMEs}.
+
+
+handle_call({mme_register, MmeInfo}, _From, MMEs0) ->
+ case mme_add(MmeInfo, MMEs0) of
+ {ok, MMEs1} ->
+ {reply, ok, MMEs1};
+ {error, Error} ->
+ {reply, {error, Error}, MMEs0}
+ end;
+
+handle_call({mme_unregister, MmeName}, _From, MMEs0) ->
+ case mme_del(MmeName, MMEs0) of
+ {ok, MMEs1} ->
+ %% TODO: kill all eNB connections?
+ {reply, ok, MMEs1};
+ {error, Error} ->
+ {reply, {error, Error}, MMEs0}
+ end;
+
+handle_call({mme_select, OldMmeName}, _From, MMEs) ->
+ Reply = mme_select(OldMmeName, MMEs),
+ {reply, Reply, MMEs};
+
+handle_call({fetch_mme_info, MmeName}, _From, MMEs) ->
+ Reply = mme_find(MmeName, MMEs),
+ {reply, Reply, MMEs};
+
+handle_call(fetch_mme_list, _From, MMEs) ->
+ {reply, MMEs, MMEs};
+
+handle_call(Info, From, MMEs) ->
+ ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]),
+ {reply, {error, not_implemented}, MMEs}.
+
+
+handle_cast(Info, MMEs) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, MMEs}.
+
+
+handle_info(Info, MMEs) ->
+ ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+ {noreply, MMEs}.
+
+
+terminate(Reason, _MMEs) ->
+ ?LOG_NOTICE("Terminating, reason ~p", [Reason]),
+ ok.
+
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+-spec mme_match(mme_info(), map()) -> true | false.
+mme_match(#{name := Name},
+ #{name := Name}) -> true;
+
+mme_match(#{raddr := Addr, rport := Port},
+ #{raddr := Addr, rport := Port}) -> true;
+
+mme_match(_, _) -> false.
+
+
+%% Add a new MME if it does not already exist
+-spec mme_add(mme_info(), mme_list()) -> {ok, mme_list()} | {error, term()}.
+mme_add(MmeInfo0, MMEs) ->
+ %% assign defaults, parse local/remote addresses
+ MmeName = maps:get(name, MmeInfo0),
+ LAddr = sctp_common:parse_addr(maps:get(laddr, MmeInfo0, "::")),
+ RAddr = sctp_common:parse_addr(maps:get(raddr, MmeInfo0)),
+ RPort = maps:get(rport, MmeInfo0, ?S1AP_PORT),
+ MmeInfo1 = MmeInfo0#{laddr => LAddr,
+ raddr => RAddr,
+ rport => RPort},
+ %% check for duplicates
+ case lists:any(fun(E) -> mme_match(E, MmeInfo1) end, MMEs) of
+ true ->
+ ?LOG_ERROR("MME (name=~p / ~p:~p) is *already* registered",
+ [MmeName, RAddr, RPort]),
+ {error, already_registered};
+ false ->
+ ?LOG_INFO("MME (name=~p, ~p:~p) registered",
+ [MmeName, RAddr, RPort]),
+ {ok, MMEs ++ [MmeInfo1]}
+ end.
+
+
+%% Remove an MME by name
+-spec mme_del(mme_name(), mme_list()) -> {ok, mme_list()} | {error, term()}.
+mme_del(MmeName, MMEs0) ->
+ Fun = fun(E) -> not mme_match(E, #{name => MmeName}) end,
+ case lists:filter(Fun, MMEs0) of
+ MMEs0 -> %% unchanged list means nothing was filtered out
+ ?LOG_ERROR("MME (name=~p) is *not* registered", [MmeName]),
+ {error, not_registered};
+ MMEs1 ->
+ ?LOG_INFO("MME (name=~p) unregistered", [MmeName]),
+ {ok, MMEs1}
+ end.
+
+
+%% Select an MME from the pool
+-spec mme_select(OldMmeName, MMEs) -> Result
+ when OldMmeName :: undefined | mme_name(),
+ MMEs :: mme_list(),
+ Result :: {ok, mme_info()} | error.
+mme_select(_OldMmeName, []) ->
+ %% empty MME pool
+ error;
+
+mme_select(_OldMmeName, [MmeInfo]) ->
+ %% only one MME in the pool
+ {ok, MmeInfo};
+
+mme_select(undefined, MMEs) ->
+ %% old MME unknown => fall back to the first entry
+ {ok, hd(MMEs)};
+
+mme_select(OldMmeName, MMEs) ->
+ Fun = fun(E) -> maps:get(name, E) =/= OldMmeName end,
+ case lists:dropwhile(Fun, MMEs) of
+ [] ->
+ %% old MME is not registered, fall back to the first entry
+ {ok, hd(MMEs)};
+ [_ | []] ->
+ %% old MME was the last entry, wrap around
+ {ok, hd(MMEs)};
+ [_ | [MmeInfo | _]] ->
+ %% return MME following the old one
+ {ok, MmeInfo}
+ end.
+
+
+%% Find an MME by name
+-spec mme_find(mme_name(), mme_list()) -> {ok, mme_info()} | error.
+mme_find(MmeName, MMEs) ->
+ Fun = fun(E) -> mme_match(E, #{name => MmeName}) end,
+ case lists:filter(Fun, MMEs) of
+ [MmeInfo] -> {ok, MmeInfo};
+ [] -> error
+ end.
+
+
+-spec mme_list_from_env() -> mme_list().
+mme_list_from_env() ->
+ MMEs = osmo_s1gw:get_env(mme_pool, []),
+ lists:foldl(fun mme_add_from_env/2, [], MMEs).
+
+
+-spec mme_add_from_env(mme_info(), mme_list()) -> mme_list().
+mme_add_from_env(MmeInfo, MMEs0) ->
+ {ok, MMEs1} = mme_add(MmeInfo, MMEs0),
+ MMEs1.
+
+
+%% vim:set ts=4 sw=4 et:
diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl
index 1616fe5..86ecbda 100644
--- a/src/osmo_s1gw_sup.erl
+++ b/src/osmo_s1gw_sup.erl
@@ -73,6 +73,11 @@
5000,
worker,
[enb_registry]},
+ MmeRegistry = {mme_registry, {mme_registry, start_link, []},
+ permanent,
+ 5000,
+ worker,
+ [mme_registry]},
SctpServer = {sctp_server, {sctp_server, start_link, [server_cfg()]},
permanent,
5000,
@@ -96,7 +101,13 @@
[erf]},
s1gw_metrics:init(),
- {ok, {{one_for_one, 5, 10}, [EnbRegistry, SctpServer, PfcpPeer, GtpuKpi, RestServer]}}.
+ {ok, {{one_for_one, 5, 10},
+ [EnbRegistry,
+ MmeRegistry,
+ SctpServer,
+ PfcpPeer,
+ GtpuKpi,
+ RestServer]}}.
%% ------------------------------------------------------------------
diff --git a/test/mme_registry_test.erl b/test/mme_registry_test.erl
new file mode 100644
index 0000000..a327233
--- /dev/null
+++ b/test/mme_registry_test.erl
@@ -0,0 +1,132 @@
+-module(mme_registry_test).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+%% ------------------------------------------------------------------
+%% setup functions
+%% ------------------------------------------------------------------
+
+-define(TC(Fun), {setup,
+ fun start/0,
+ fun stop/1,
+ Fun}).
+
+
+start() ->
+ MMEs = [#{name => "mme0", raddr => "127.0.0.10"},
+ #{name => "mme1", raddr => "127.0.0.11"}],
+ osmo_s1gw:set_env(mme_pool, MMEs),
+ {ok, _} = mme_registry:start_link().
+
+
+stop(_) ->
+ mme_registry:shutdown().
+
+
+%% ------------------------------------------------------------------
+%% testcase descriptions
+%% ------------------------------------------------------------------
+
+mme_registry_test_() ->
+ [{"Test adding and deleting MMEs",
+ ?TC(fun test_add_del/1)},
+ {"Test fetching MME info",
+ ?TC(fun test_fetch/1)},
+ {"Test MME selection",
+ ?TC(fun test_select/1)},
+ {"Test MME selection (empty pool)",
+ ?TC(fun test_select_empty/1)}].
+
+
+%% ------------------------------------------------------------------
+%% actual testcases
+%% ------------------------------------------------------------------
+
+test_add_del(_) ->
+ [%% "mme0" and "mme1" come pre-registered, register "mme2"
+ ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2",
+ raddr => "127.0.0.12"})),
+ %% duplicate name
+ ?_assertEqual({error, already_registered},
+ mme_registry:mme_register(#{name => "mme2",
+ raddr => "127.0.0.13"})),
+ %% duplicate raddr:rport
+ ?_assertEqual({error, already_registered},
+ mme_registry:mme_register(#{name => "mme3",
+ raddr => "127.0.0.12"})),
+ %% deleting previously registered MMEs
+ ?_assertEqual(ok, mme_registry:mme_unregister("mme0")),
+ ?_assertEqual(ok, mme_registry:mme_unregister("mme1")),
+ ?_assertEqual(ok, mme_registry:mme_unregister("mme2")),
+ %% deleting non-existent MMEs
+ ?_assertEqual({error, not_registered},
+ mme_registry:mme_unregister("mme0")),
+ ?_assertEqual({error, not_registered},
+ mme_registry:mme_unregister("mme1")),
+ ?_assertEqual({error, not_registered},
+ mme_registry:mme_unregister("mme42"))].
+
+
+test_fetch(_) ->
+ [%% "mme0" and "mme1" come pre-registered, register "mme2"
+ ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2",
+ laddr => "192.168.1.100",
+ raddr => "192.168.1.101",
+ rport => 1337})),
+ %% test fetching a list of MMEs
+ ?_assertMatch([#{name := "mme0", raddr := {127,0,0,10}},
+ #{name := "mme1", raddr := {127,0,0,11}},
+ #{name := "mme2", raddr := {192,168,1,101}}],
+ mme_registry:fetch_mme_list()),
+ %% test fetching MMEs by name
+ ?_assertMatch({ok, #{name := "mme0",
+ laddr := {0,0,0,0,0,0,0,0}, %% "::"
+ raddr := {127,0,0,10},
+ rport := 36412}},
+ mme_registry:fetch_mme_info("mme0")),
+ ?_assertMatch({ok, #{name := "mme1",
+ laddr := {0,0,0,0,0,0,0,0}, %% "::"
+ raddr := {127,0,0,11},
+ rport := 36412}},
+ mme_registry:fetch_mme_info("mme1")),
+ ?_assertMatch({ok, #{name := "mme2",
+ laddr := {192,168,1,100},
+ raddr := {192,168,1,101},
+ rport := 1337}},
+ mme_registry:fetch_mme_info("mme2")),
+ %% "mme3" is not registered
+ ?_assertEqual(error,
+ mme_registry:fetch_mme_info("mme3"))].
+
+
+test_select(_) ->
+ [%% "mme0" and "mme1" come pre-registered, register "mme2"
+ ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2",
+ raddr => "127.0.0.12"})),
+ %% old MME unknown, fall back to the first entry ("mme0")
+ ?_assertMatch({ok, #{name := "mme0"}},
+ mme_registry:mme_select()),
+ ?_assertMatch({ok, #{name := "mme0"}},
+ mme_registry:mme_select("mme3")),
+ %% old MME known, expect the next MME to be selected
+ ?_assertMatch({ok, #{name := "mme1"}},
+ mme_registry:mme_select("mme0")),
+ ?_assertMatch({ok, #{name := "mme2"}},
+ mme_registry:mme_select("mme1")),
+ ?_assertMatch({ok, #{name := "mme0"}},
+ mme_registry:mme_select("mme2"))].
+
+
+test_select_empty(_) ->
+ [%% "mme0" and "mme1" come pre-registered, unregister them
+ ?_assertEqual(ok, mme_registry:mme_unregister("mme0")),
+ ?_assertEqual(ok, mme_registry:mme_unregister("mme1")),
+ %% the MME pool is now empty, expect an error
+ ?_assertEqual(error, mme_registry:mme_select()),
+ ?_assertEqual(error, mme_registry:mme_select("mme0")),
+ ?_assertEqual(error, mme_registry:mme_select("mme1")),
+ ?_assertEqual(error, mme_registry:mme_select("mme2"))].
+
+
+%% vim:set ts=4 sw=4 et:
To view, visit change 41281. To unsubscribe, or for help writing mail filters, visit settings.