osmith submitted this change.
Initial SCTP proxy (server/client) implementation
Change-Id: Ia317f58f7dcbec42930165fdcd42d0ddd23e289c
---
A .gitignore
A config/sys.config
A rebar.config
A rebar.lock
A src/osmo_s1gw.app.src
A src/osmo_s1gw_app.erl
A src/sctp_client.erl
A src/sctp_proxy.erl
A src/sctp_server.erl
9 files changed, 480 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..83aa88b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+_build
+*.beam
diff --git a/config/sys.config b/config/sys.config
new file mode 100644
index 0000000..072f183
--- /dev/null
+++ b/config/sys.config
@@ -0,0 +1,8 @@
+%% -*- erlang -*-
+
+[{kernel,
+ [{logger_level, info},
+ {logger,
+ [{handler, default, logger_std_h,
+ #{ level => debug, formatter => {logger_formatter,
+ #{ template => ["[", level, "] ", pid, " ", file, ":", line, " ", msg, "\n"] }}}}]}]}].
diff --git a/rebar.config b/rebar.config
new file mode 100644
index 0000000..7d7b6f2
--- /dev/null
+++ b/rebar.config
@@ -0,0 +1,15 @@
+%% -*- erlang -*-
+
+{erl_opts, [debug_info, {parse_transform}]}.
+
+{minimum_otp_vsn, "25.2.3"}.
+
+{deps, []}.
+
+{xref_checks, [undefined_function_calls, undefined_functions,
+ deprecated_function_calls, deprecated_functions]}.
+
+{dialyzer, [
+ {plt_extra_apps, [kernel, stdlib, erts, tools, inets, compiler]},
+ {warnings, [no_improper_lists]}
+]}.
diff --git a/rebar.lock b/rebar.lock
new file mode 100644
index 0000000..57afcca
--- /dev/null
+++ b/rebar.lock
@@ -0,0 +1 @@
+[].
diff --git a/src/osmo_s1gw.app.src b/src/osmo_s1gw.app.src
new file mode 100644
index 0000000..bc2a8d7
--- /dev/null
+++ b/src/osmo_s1gw.app.src
@@ -0,0 +1,14 @@
+%-*- mode: erlang -*-
+
+{application, osmo_s1gw, [
+ {description, "Osmocom S1AP Gateway"},
+ {vsn, "1"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {modules, []},
+ {mod, {osmo_s1gw_app, []}},
+ {env, []}
+]}.
diff --git a/src/osmo_s1gw_app.erl b/src/osmo_s1gw_app.erl
new file mode 100644
index 0000000..596b3e8
--- /dev/null
+++ b/src/osmo_s1gw_app.erl
@@ -0,0 +1,11 @@
+-module(osmo_s1gw_app).
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+ sctp_server:start_link().
+ %% TODO: osmo_s1gw_sup:start_link().
+
+stop(_State) ->
+ ok.
diff --git a/src/sctp_client.erl b/src/sctp_client.erl
new file mode 100644
index 0000000..67c7069
--- /dev/null
+++ b/src/sctp_client.erl
@@ -0,0 +1,42 @@
+-module(sctp_client).
+
+-export([connect/0,
+ connect/1,
+ connect/2,
+ send_data/2,
+ disconnect/1]).
+
+-include_lib("kernel/include/inet.hrl").
+-include_lib("kernel/include/inet_sctp.hrl").
+
+-define(S1AP_PORT, 36412).
+-define(S1AP_PPID, 18).
+-define(SCTP_STREAM, 0).
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+connect() ->
+ connect(localhost).
+
+connect(Host) ->
+ connect(Host, ?S1AP_PORT).
+
+connect(Host, Port) ->
+ {ok, Sock} = gen_sctp:open([{type, seqpacket},
+ {active, true}]),
+ gen_sctp:connect_init(Sock, Host, Port, []),
+ {ok, Sock}.
+
+
+send_data({Sock, Aid}, Data) ->
+ gen_sctp:send(Sock, #sctp_sndrcvinfo{stream = ?SCTP_STREAM,
+ ppid = ?S1AP_PPID,
+ assoc_id = Aid}, Data).
+
+
+disconnect({Sock, Aid}) ->
+ gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}).
+
+%% vim:set ts=4 sw=4 et:
diff --git a/src/sctp_proxy.erl b/src/sctp_proxy.erl
new file mode 100644
index 0000000..e09720c
--- /dev/null
+++ b/src/sctp_proxy.erl
@@ -0,0 +1,177 @@
+-module(sctp_proxy).
+-behaviour(gen_statem).
+
+-export([init/1,
+ callback_mode/0,
+ connecting/3,
+ connected/3,
+ code_change/4,
+ terminate/3]).
+-export([start_link/3,
+ send_data/2,
+ shutdown/1]).
+
+-include_lib("kernel/include/inet.hrl").
+-include_lib("kernel/include/inet_sctp.hrl").
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+start_link(Aid, MmeAddr, MmePort) ->
+ gen_statem:start_link(?MODULE, [Aid, MmeAddr, MmePort], []).
+
+
+send_data(Pid, Data) ->
+ gen_statem:cast(Pid, {send_data, Data}).
+
+
+shutdown(Pid) ->
+ gen_statem:stop(Pid).
+
+
+%% ------------------------------------------------------------------
+%% gen_statem API
+%% ------------------------------------------------------------------
+
+init([Aid, MmeAddr, MmePort]) ->
+ {ok, connecting,
+ #{enb_aid => Aid,
+ mme_addr => MmeAddr,
+ mmr_port => MmePort,
+ tx_queue => []}}.
+
+
+callback_mode() ->
+ [state_functions, state_enter].
+
+
+%% CONNECTING state
+connecting(enter, OldState,
+ #{mme_addr := MmeAddr, mmr_port := MmePort} = S) ->
+ logger:info("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]),
+ %% Initiate connection establishment with the MME
+ {ok, Sock} = sctp_client:connect(MmeAddr, MmePort),
+ {keep_state, S#{sock => Sock},
+ [{state_timeout, 2_000_000, conn_est_timeout}]};
+
+%% Handle connection establishment timeout
+connecting(state_timeout, conn_est_timeout, _S) ->
+ {stop, {shutdown, conn_est_timeout}};
+
+%% Handle an eNB -> MME data forwarding request (queue)
+connecting(cast, {send_data, Data},
+ #{tx_queue := Pending} = S) ->
+ {keep_state, S#{tx_queue := [Data | Pending]}};
+
+%% Handle an #sctp_assoc_change event (connection state)
+connecting(info, {sctp, _Socket, MmeAddr, MmePort,
+ {[], #sctp_assoc_change{state = ConnState,
+ assoc_id = Aid}}}, S) ->
+ case ConnState of
+ comm_up ->
+ logger:notice("MME connection (id=~p, ~p:~p) established",
+ [Aid, MmeAddr, MmePort]),
+ {next_state, connected, S#{mme_aid => Aid}};
+ _ ->
+ logger:notice("MME connection establishment failed: ~p", [ConnState]),
+ {stop, {shutdown, conn_est_fail}}
+ end;
+
+%% Catch-all for other kinds of SCTP events
+connecting(info, {sctp, _Socket, MmeAddr, MmePort,
+ {AncData, Data}}, S) ->
+ logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p",
+ [MmeAddr, MmePort, AncData, Data]),
+ {keep_state, S};
+
+connecting(Event, EventData, S) ->
+ logger:error("Unexpected event ~p: ~p", [Event, EventData]),
+ {keep_state, S}.
+
+
+%% CONNECTED state
+connected(enter, OldState, S) ->
+ logger:info("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]),
+ %% Send pending eNB -> MME messages (if any)
+ ok = sctp_send_pending(S),
+ {keep_state, maps:remove(tx_queue, S)};
+
+%% Handle an eNB -> MME data forwarding request (forward)
+connected(cast, {send_data, Data}, S) ->
+ ok = sctp_send(S, Data),
+ {keep_state, S};
+
+%% Handle an #sctp_assoc_change event (MME connection state)
+connected(info, {sctp, _Socket, MmeAddr, MmePort,
+ {[], #sctp_assoc_change{state = ConnState,
+ assoc_id = Aid}}}, S) ->
+ case ConnState of
+ comm_up ->
+ logger:notice("MME connection (id=~p, ~p:~p) is already established?!?",
+ [Aid, MmeAddr, MmePort]),
+ {keep_state, S};
+ _ ->
+ logger:notice("MME connection state: ~p", [ConnState]),
+ {stop, {shutdown, conn_fail}}
+ end;
+
+%% Handle an #sctp_sndrcvinfo event (MME -> eNB data)
+connected(info, {sctp, _Socket, MmeAddr, MmePort,
+ {[#sctp_sndrcvinfo{assoc_id = Aid}], Data}}, S) ->
+ logger:info("MME connection (id=~p, ~p:~p) Rx ~p",
+ [Aid, MmeAddr, MmePort, Data]),
+ sctp_server:send_data(maps:get(enb_aid, S), Data),
+ {keep_state, S};
+
+%% Catch-all for other kinds of SCTP events
+connected(info, {sctp, _Socket, MmeAddr, MmePort,
+ {AncData, Data}}, S) ->
+ logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p",
+ [MmeAddr, MmePort, AncData, Data]),
+ {keep_state, S};
+
+%% Catch-all handler for this state
+connected(Event, EventData, S) ->
+ logger:error("Unexpected event ~p: ~p", [Event, EventData]),
+ {keep_state, S}.
+
+
+
+code_change(_Vsn, State, S, _Extra) ->
+ {ok, State, S}.
+
+
+terminate(Reason, State, S) ->
+ logger:notice("Terminating in state ~p, reason ~p", [State, Reason]),
+ case S of
+ #{sock := Sock, mme_aid := Aid} ->
+ sctp_client:disconnect({Sock, Aid}),
+ gen_sctp:close(Sock);
+ #{sock := Sock} ->
+ gen_sctp:close(Sock);
+ _ -> ok
+ end.
+
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+%% Send a single message to the MME
+sctp_send(#{sock := Sock, mme_aid := Aid}, Data) ->
+ sctp_client:send_data({Sock, Aid}, Data).
+
+
+%% Send pending messages to the MME
+sctp_send_pending(#{tx_queue := Pending} = S) ->
+ sctp_send_pending(S, lists:reverse(Pending)).
+
+sctp_send_pending(S, [Data | Pending]) ->
+ sctp_send(S, Data),
+ sctp_send_pending(S, Pending);
+
+sctp_send_pending(_S, []) ->
+ ok.
+
+%% vim:set ts=4 sw=4 et:
diff --git a/src/sctp_server.erl b/src/sctp_server.erl
new file mode 100644
index 0000000..883991a
--- /dev/null
+++ b/src/sctp_server.erl
@@ -0,0 +1,202 @@
+-module(sctp_server).
+-behaviour(gen_server).
+
+-export([init/1,
+ handle_info/2,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2]).
+-export([start_link/0,
+ start_link/2]).
+-export([send_data/2]).
+
+-include_lib("kernel/include/inet.hrl").
+-include_lib("kernel/include/inet_sctp.hrl").
+
+-define(S1AP_PORT, 36412).
+-define(S1AP_PPID, 18).
+-define(SCTP_STREAM, 0).
+
+-record(server_state, {sock, clients}).
+-record(client_state, {addr_port, pid}).
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+start_link() ->
+ start_link({127,0,0,1}, ?S1AP_PORT).
+
+start_link(BindAddr, BindPort) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE,
+ [BindAddr, BindPort], []).
+
+
+send_data(Aid, Data) ->
+ gen_server:cast(?MODULE, {send_data, Aid, Data}).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([BindAddr, BindPort]) ->
+ process_flag(trap_exit, true),
+ {ok, Sock} = gen_sctp:open([{ip, BindAddr},
+ {port, BindPort},
+ {type, seqpacket},
+ {reuseaddr, true},
+ {active, true}]),
+ logger:info("SCTP server listening on ~w:~w", [BindAddr, BindPort]),
+ ok = gen_sctp:listen(Sock, true),
+ {ok, #server_state{sock = Sock,
+ clients = dict:new()}}.
+
+
+handle_call(Info, _From, State) ->
+ error_logger:error_report(["unknown handle_call()",
+ {module, ?MODULE}, {info, Info}, {state, State}]),
+ {reply, error, not_implemented}.
+
+
+handle_cast({send_data, Aid, Data}, State) ->
+ gen_sctp:send(State#server_state.sock,
+ #sctp_sndrcvinfo{stream = ?SCTP_STREAM,
+ ppid = ?S1AP_PPID,
+ assoc_id = Aid}, Data),
+ {noreply, State};
+
+handle_cast(Info, State) ->
+ error_logger:error_report(["unknown handle_cast()",
+ {module, ?MODULE}, {info, Info}, {state, State}]),
+ {noreply, State}.
+
+
+%% Handle SCTP events coming from gen_sctp module
+handle_info({sctp, _Socket, FromAddr, FromPort, {AncData, Data}}, State) ->
+ NewState = sctp_recv(State, {FromAddr, FromPort, AncData, Data}),
+ {noreply, NewState};
+
+%% Handle termination events of the child processes
+handle_info({'EXIT', Pid, Reason},
+ #server_state{sock = Sock, clients = Clients} = State) ->
+ logger:debug("Child process ~p terminated with reason ~p", [Pid, Reason]),
+ case client_find(State, Pid) of
+ {ok, {Aid, _Client}} ->
+ %% gracefully close the eNB connection
+ gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}),
+ {noreply, State#server_state{clients = dict:erase(Aid, Clients)}};
+ error ->
+ {noreply, State}
+ end;
+
+%% Catch-all for unknown messages
+handle_info(Info, State) ->
+ error_logger:error_report(["unknown handle_info()",
+ {module, ?MODULE}, {info, Info}, {state, State}]),
+ {noreply, State}.
+
+
+terminate(shutdown, State) ->
+ logger:notice("Terminating ~p", [?MODULE]),
+ close_conns(State),
+ gen_sctp:close(State#server_state.sock),
+ ok.
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+%% Handle an #sctp_assoc_change event (connection state)
+sctp_recv(State, {FromAddr, FromPort, [],
+ #sctp_assoc_change{state = ConnState,
+ assoc_id = Aid}}) ->
+ case ConnState of
+ comm_up ->
+ logger:notice("Connection (id=~p, ~p:~p) established", [Aid, FromAddr, FromPort]),
+ Clients = client_add(State#server_state.clients, Aid, FromAddr, FromPort);
+ shutdown_comp ->
+ logger:notice("Connection (id=~p, ~p:~p) closed", [Aid, FromAddr, FromPort]),
+ Clients = client_del(State#server_state.clients, Aid);
+ comm_lost ->
+ logger:notice("Connection (id=~p, ~p:~p) lost", [Aid, FromAddr, FromPort]),
+ Clients = client_del(State#server_state.clients, Aid);
+ _ ->
+ logger:notice("Connection (id=~p, ~p:~p) state ~p",
+ [Aid, FromAddr, FromPort, ConnState]),
+ Clients = State#server_state.clients
+ end,
+ State#server_state{clients = Clients};
+
+%% Handle an #sctp_sndrcvinfo event (incoming data)
+sctp_recv(State, {FromAddr, FromPort,
+ [#sctp_sndrcvinfo{assoc_id = Aid}], Data}) ->
+ logger:info("Connection (id=~p, ~p:~p) Rx ~p", [Aid, FromAddr, FromPort, Data]),
+ case dict:find(Aid, State#server_state.clients) of
+ {ok, #client_state{pid = Pid}} ->
+ sctp_proxy:send_data(Pid, Data);
+ error ->
+ logger:error("Connection (id=~p, ~p:~p) is not known to us?!?",
+ [Aid, FromAddr, FromPort])
+ end,
+ State;
+
+%% Catch-all for other kinds of SCTP events
+sctp_recv(State, {FromAddr, FromPort, AncData, Data}) ->
+ logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p",
+ [FromAddr, FromPort, AncData, Data]),
+ State.
+
+
+%% Add a new client to the list, spawning a proxy process
+client_add(Clients, Aid, FromAddr, FromPort) ->
+ {ok, Pid} = sctp_proxy:start_link(Aid, {127,0,1,1}, ?S1AP_PORT), %% XXX!
+ NewClient = #client_state{addr_port = {FromAddr, FromPort}, pid = Pid},
+ dict:store(Aid, NewClient, Clients).
+
+
+%% Delete an existing client from the list, stopping the proxy process
+client_del(Clients, Aid) ->
+ case dict:find(Aid, Clients) of
+ {ok, Client} ->
+ sctp_proxy:shutdown(Client#client_state.pid),
+ dict:erase(Aid, Clients);
+ error ->
+ Clients
+ end.
+
+
+%% Find a client by process ID
+client_find(#server_state{clients = Clients}, Pid) ->
+ client_find(dict:to_list(Clients), Pid);
+
+client_find([{Aid, Client} | Clients], Pid) ->
+ case Client of
+ #client_state{pid = Pid} ->
+ {ok, {Aid, Client}};
+ _ ->
+ client_find(Clients, Pid)
+ end;
+
+client_find([], _Pid) ->
+ error.
+
+
+%% Gracefully terminate client connections
+close_conns(#server_state{sock = Sock, clients = Clients}) ->
+ close_conns(Sock, dict:to_list(Clients)).
+
+close_conns(Sock, [{Aid, Client} | Clients]) ->
+ {FromAddr, FromPort} = Client#client_state.addr_port,
+ logger:notice("Terminating connection (id=~p, ~p:~p)", [Aid, FromAddr, FromPort]),
+ %% request to terminate an MME connection
+ sctp_proxy:shutdown(Client#client_state.pid),
+ %% gracefully close an eNB connection
+ gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}),
+ %% ... and so for the remaining clients
+ close_conns(Sock, Clients);
+
+close_conns(_Sock, []) ->
+ ok.
+
+%% vim:set ts=4 sw=4 et:
To view, visit change 37036. To unsubscribe, or for help writing mail filters, visit settings.