Создание Неблокирующего Tcp-Сервера С Использованием Принципов Otp



Введение Предполагается, что читатель данного руководства уже знаком с gen_server И gen_fsm поведение, взаимодействие через TCP-сокеты с использованием модуля gen_tcp , активный и пассивный режимы сокетов, а также принцип «OTP Supervisor».

OTP предоставляет удобные инструменты для создания надежных приложений.

Частично это достигается путем абстрагирования общих функций в такие поведения, как gen_server И gen_fsm , которые связаны иерархией супервизоров OTP. Существует несколько хорошо известных шаблонов TCP-серверов.

Тот, который мы собираемся рассмотреть, включает в себя один процесс прослушивания и процесс создания нового процесса FSM для каждого подключающегося клиента.

Хотя в OTP есть поддержка TCP-соединений через gen_tcp модуля не существует стандартного поведения для создания неблокирующего TCP-сервера на основе принципов OTP. Под неблокирующим сервером мы подразумеваем, что процесс прослушивания и процесс FSM не должны выполнять никаких блокирующих вызовов и быстро реагировать на входящие сообщения (например, изменения конфигурации, перезапуски и т. д.), не вызывая таймаутов.

Обратите внимание, что блокировка в контексте Erlang означает блокировку процесса Erlang, а не процесса операционной системы.

В этом уроке мы покажем, как создать неблокирующий TCP-сервер, используя gen_server И gen_fsm , которые обеспечивают контроль над поведением приложений и полностью удовлетворяют принципам OTP. Читателю, не знакомому с OTP, рекомендуется взглянуть на руководство Джо Армстронга о том, как построить отказоустойчивые серверы с помощью блокировки вызовов.

gen_tcp:connect/3 И gen_tcp: принять/1 без использования OTP.



Структура сервера

Проект нашего сервера будет включать в себя основной процесс управления приложениями.

tcp_server_app со стратегией перезапуска один за один и два дочерних процесса.

Первым из них является процесс прослушивания, реализованный как gen_server , который будет ждать асинхронных уведомлений о клиентских подключениях.

Второй — еще один супервизор приложений.

tcp_client_sup и отвечает за запуск процесса FSM для обработки клиентских запросов и регистрации аномальных сбоев с использованием стандартных отчетов об ошибках SASL. Чтобы упростить этот материал, обработчик клиентских запросов (tcp_echo_fsm) предоставит сервер «Эxo», который будет возвращать клиентские запросы обратно.



Поведение приложения и супервизора

Чтобы создать наше приложение, нам необходимо написать модули, реализующие функции обратного вызова поведений «Супервизор» и «Приложение».

Традиционно эти функции реализуются в отдельных модулях, учитывая их краткость, мы объединим их в один.

В качестве дополнительного бонуса мы реализуем get_app_env функция, которая показывает, как обрабатывать параметры конфигурации, а также параметры командной строки эмулятора при запуске.

Два экземпляра функции инициализация/1 необходимы для двух уровней иерархии супервизора.

Поскольку существует две разные стратегии перезапуска, мы реализуем их на разных уровнях.

После запуска приложения функция обратного вызова tcp_server_app:start/2 вызывает функцию супервайзер: start_link/2 , который создает главный супервизор приложения, вызывая tcp_server_app: init([Порт, Модуль]) .

Это руководитель, создающий процесс tcp_listener и детский воспитатель tcp_client_sup отвечает за обработку клиентских соединений.

Аргумент Модуль в функции в этом это имя обработчика FSM для клиентских подключений (в данном случае tcp_echo_fsm ).

Приложение TCP-сервера (tcp_server_app.erl):

  
  
  
  
  
  
  
  
  
  
   

-module(tcp_server_app).

-author('[email protected]').

-behaviour(application).

%% Internal API -export([start_client/0]).

%% Application and Supervisor callbacks -export([start/2, stop/1, init/1]).

-define(MAX_RESTART, 5).

-define(MAX_TIME, 60).

-define(DEF_PORT, 2222).

%% A startup function for spawning new client connection handling FSM. %% To be called by the TCP listener process. start_client() -> supervisor:start_child(tcp_client_sup, []).

%%---------------------------------------------------------------------- %% Application behaviour callbacks %%---------------------------------------------------------------------- start(_Type, _Args) -> ListenPort = get_app_env(listen_port, ЭDEF_PORT), supervisor:start_link({local, ЭMODULE}, ЭMODULE, [ListenPort, tcp_echo_fsm]).

stop(_S) -> ok. %%---------------------------------------------------------------------- %% Supervisor behaviour callbacks %%---------------------------------------------------------------------- init([Port, Module]) -> {ok, {_SupFlags = {one_for_one, ЭMAX_RESTART, ЭMAX_TIME}, [ % TCP Listener { tcp_server_sup, % Id = internal id {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A} permanent, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [tcp_listener] % Modules = [Module] | dynamic }, % Client instance supervisor { tcp_client_sup, {supervisor,start_link,[{local, tcp_client_sup}, ЭMODULE, [Module]]}, permanent, % Restart = permanent | transient | temporary infinity, % Shutdown = brutal_kill | int() >= 0 | infinity supervisor, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }; init([Module]) -> {ok, {_SupFlags = {simple_one_for_one, ЭMAX_RESTART, ЭMAX_TIME}, [ % TCP Client { undefined, % Id = internal id {Module,start_link,[]}, % StartFun = {M, F, A} temporary, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }.

%%---------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------- get_app_env(Opt, Default) -> case application:get_env(application:get_application(), Opt) of {ok, Val} -> Val; _ -> case init:get_argument(Opt) of [[Val | _]] -> Val; error -> Default end end.



Процесс прослушивания

Один из недостатков gen_tcp заключается в том, что он предоставляет интерфейс только для блокировки соединений.

Тестирование модуля prim_inet показал интересный факт: команда сетевому драйверу принять клиентское соединение является асинхронной.

Хотя это не задокументировано, а это означает, что команда OTP может изменить это в любое время, мы будем использовать эту функцию при создании нашего сервера.

Процесс прослушивания реализован как gen_server .

Процесс TCP-прослушивателя (tcp_listener.erl):

-module(tcp_listener).

-author('[email protected]').

-behaviour(gen_server).

%% External API -export([start_link/2]).

%% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-record(state, { listener, % Listening socket acceptor, % Asynchronous acceptor's internal reference module % FSM handling module }).

%%-------------------------------------------------------------------- %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason} % %% @doc Called by a supervisor to start the listening process. %% @end %%---------------------------------------------------------------------- start_link(Port, Module) when is_integer(Port), is_atom(Module) -> gen_server:start_link({local, ЭMODULE}, ЭMODULE, [Port, Module], []).

%%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%---------------------------------------------------------------------- %% @spec (Port::integer()) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% %% @doc Called by gen_server framework at process startup. %% Create listening socket. %% @end %%---------------------------------------------------------------------- init([Port, Module]) -> process_flag(trap_exit, true), Opts = [binary, {packet, 2}, {reuseaddr, true}, {keepalive, true}, {backlog, 30}, {active, false}], case gen_tcp:listen(Port, Opts) of {ok, Listen_socket} -> %%Create first accepting process {ok, Ref} = prim_inet:async_accept(Listen_socket, -1), {ok, #state{listener = Listen_socket, acceptor = Ref, module = Module}}; {error, Reason} -> {stop, Reason} end. %%------------------------------------------------------------------------- %% @spec (Request, From, State) -> {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% @doc Callback for synchronous server calls. If `{stop, .

}' tuple %% is returned, the server is stopped and `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_call(Request, _From, State) -> {stop, {unknown_call, Request}, State}.

%%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for asyncrous server calls. If `{stop, .

}' tuple %% is returned, the server is stopped and `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_cast(_Msg, State) -> {noreply, State}.

%%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for messages sent directly to server's mailbox. %% If `{stop, .

}' tuple is returned, the server is stopped and %% `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_info({inet_async, ListSock, Ref, {ok, CliSocket}}, #state{listener=ListSock, acceptor=Ref, module=Module} = State) -> try case set_sockopt(ListSock, CliSocket) of ok -> ok; {error, Reason} -> exit({set_sockopt, Reason}) end, %% New client connected - spawn a new process using the simple_one_for_one %% supervisor. {ok, Pid} = tcp_server_app:start_client(), gen_tcp:controlling_process(CliSocket, Pid), %% Instruct the new FSM that it owns the socket. Module:set_socket(Pid, CliSocket), %% Signal the network driver that we are ready to accept another connection case prim_inet:async_accept(ListSock, -1) of {ok, NewRef} -> ok; {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)}) end, {noreply, State#state{acceptor=NewRef}} catch exit:Why -> error_logger:error_msg("Error in async accept: ~p.\n", [Why]), {stop, Why, State} end; handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) -> error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]), {stop, Error, State}; handle_info(_Info, State) -> {noreply, State}.

%%------------------------------------------------------------------------- %% @spec (Reason, State) -> any %% @doc Callback executed on server shutdown. It is only invoked if %% `process_flag(trap_exit, true)' is set by the server process. %% The return value is ignored. %% @end %% @private %%------------------------------------------------------------------------- terminate(_Reason, State) -> gen_tcp:close(State#state.listener), ok. %%------------------------------------------------------------------------- %% @spec (OldVsn, State, Extra) -> {ok, NewState} %% @doc Convert process state when code is changed. %% @end %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}.

%%%------------------------------------------------------------------------ %%% Internal functions %%%------------------------------------------------------------------------ %% Taken from prim_inet. We are merely copying some socket options from the %% listening socket to the new client socket. set_sockopt(ListSock, CliSocket) -> true = inet_db:register_socket(CliSocket, inet_tcp), case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of {ok, Opts} -> case prim_inet:setopts(CliSocket, Opts) of ok -> ok; Error -> gen_tcp:close(CliSocket), Error end; Error -> gen_tcp:close(CliSocket), Error end.

В этом модуле инициализация/1 принимает два параметра — номер порта, который должен открыть прослушивающий процесс, и имя обработчика клиентского соединения.

Функция инициализации открывает сокет в пассивном режиме.

Это сделано для того, чтобы мы могли контролировать поток данных, получаемых от клиента.

Самая интересная часть этого кода — вызов prim_inet:async_accept/2 .

Чтобы это работало, нам нужно скопировать часть внутреннего OTP-кода из функции.

set_sockopt/2 , который обрабатывает регистрацию сокета и копирование некоторых параметров клиентского сокета.

Как только клиентский сокет будет подключен, сетевой драйвер уведомит процесс прослушивания сообщением.

{inet_async, ListSock, Ref, {ОК, CliSocket}} .

На этом этапе мы запускаем экземпляр процесса обработки клиентского запроса и передаем ему право владения CliSocket.

Процесс обработки клиентских сообщений

Пока tcp_listener является обобщенной реализацией, tcp_echo_fsm нет ничего, кроме заглушки FSM, описывающей, как создать TCP-сервер.

Из этого модуля вам нужно экспортировать две функции: start_link/0 для руководителя tcp_client_sup И set_socket/2 чтобы процесс прослушивания уведомил процесс обработки сообщений клиента о том, что он становится владельцем сокета и может начать получать сообщения, установив {активен, один раз} или {активный, правда} вариант. Мы хотели бы подчеркнуть шаблон синхронизации, используемый между процессом прослушивания и процессами клиента, чтобы избежать возможной потери сообщений из-за их передачи неправильному (прослушивающему) процессу.

Процесс, владеющий сокетом, сохраняет его открытым в пассивном режиме.

Далее клиентский процесс принимает сокет, который наследует параметры (включая пассивный режим) от прослушивающего процесса.

Владение сокетами передается клиентскому процессу с помощью вызовов.

gen_tcp:controlling_process/2 И set_socket/2 , который уведомит клиентский процесс о том, что он может начать получать сообщения из сокета.

Пока сокет не будет переведен в активный режим, все полученные данные будут храниться в буфере сокета.

Когда владение сокетом передается клиентскому FSM-процессору в состоянии "WAIT_FOR_SOCKET" , установлен {активен, один раз} режим, позволяющий сетевому драйверу передавать по одному сообщению за раз.

Это принцип OTP, используемый для сохранения контроля над потоком данных и предотвращения смешивания сообщений и TCP-трафика в очереди процесса.

Состояния автомата реализуются с помощью специальных функций в модуле tcp_echo_fsm , который использует соглашение об именах.

ФШМ состоит из двух государств.

WAIT_FOR_SOCKET - это начальное состояние, в котором FSM ожидает владения сокетом, и WAIT_FOR_DATA , что представляет собой состояние ожидания TCP-сообщения от клиента.

В этом состоянии FSM также обрабатывает специальное сообщение «тайм-аут», означающее отсутствие активности со стороны клиента, и вызывает процесс для закрытия соединения с клиентом.



-module(tcp_echo_fsm).

-author('[email protected]').

-behaviour(gen_fsm).

-export([start_link/0, set_socket/2]).

%% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).

%% FSM States -export([ 'WAIT_FOR_SOCKET'/2, 'WAIT_FOR_DATA'/2 ]).

-record(state, { socket, % client socket addr % client address }).

-define(TIMEOUT, 120000).

%%%------------------------------------------------------------------------ %%% API %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error} %% @doc To be called by the supervisor in order to start the server. %% If init/1 fails with Reason, the function returns {error,Reason}.

%% If init/1 returns {stop,Reason} or ignore, the process is %% terminated and the function returns {error,Reason} or ignore, %% respectively. %% @end %%------------------------------------------------------------------------- start_link() -> gen_fsm:start_link(ЭMODULE, [], []).

set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) -> gen_fsm:send_event(Pid, {socket_ready, Socket}).

%%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | %% {stop, StopReason} %% @private %%------------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), {ok, 'WAIT_FOR_SOCKET', #state{}}.

%%------------------------------------------------------------------------- %% Func: StateName/2 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) -> % Now we own the socket inet:setopts(Socket, [{active, once}, {packet, 2}, binary]), {ok, {IP, _Port}} = inet:peername(Socket), {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ЭTIMEOUT}; 'WAIT_FOR_SOCKET'(Other, State) -> error_logger:error_msg("State: 'WAIT_FOR_SOCKET'.

Unexpected message: ~p\n", [Other]), %% Allow to receive async messages {next_state, 'WAIT_FOR_SOCKET', State}.

%% Notification event coming from client 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) -> ok = gen_tcp:send(S, Data), {next_state, 'WAIT_FOR_DATA', State, ЭTIMEOUT}; 'WAIT_FOR_DATA'(timeout, State) -> error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]), {stop, normal, State}; 'WAIT_FOR_DATA'(Data, State) -> io:format("~p Ignoring data: ~p\n", [self(), Data]), {next_state, 'WAIT_FOR_DATA', State, ЭTIMEOUT}.

%%------------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_event(Event, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}.

%%------------------------------------------------------------------------- %% Func: handle_sync_event/4 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %% @private %%------------------------------------------------------------------------- handle_sync_event(Event, _From, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}.

%%------------------------------------------------------------------------- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) -> % Flow control: enable forwarding of next TCP message inet:setopts(Socket, [{active, once}]), ЭMODULE:StateName({data, Bin}, StateData); handle_info({tcp_closed, Socket}, _StateName, #state{socket=Socket, addr=Addr} = StateData) -> error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]), {stop, normal, StateData}; handle_info(_Info, StateName, StateData) -> {noreply, StateName, StateData}.

%%------------------------------------------------------------------------- %% Func: terminate/3 %% Purpose: Shutdown the fsm %% Returns: any %% @private %%------------------------------------------------------------------------- terminate(_Reason, _StateName, #state{socket=Socket}) -> (catch gen_tcp:close(Socket)), ok. %%------------------------------------------------------------------------- %% Func: code_change/4 %% Purpose: Convert process state when code is changed %% Returns: {ok, NewState, NewStateData} %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}.





Описание приложения

Другой необходимой частью создания приложения OTP является создание файла конфигурации, содержащего имя приложения, версию, модуль запуска и переменные среды.

Файл приложения (tcp_server.app):

{application, tcp_server, [ {description, "Demo TCP server"}, {vsn, "1.0"}, {id, "tcp_server"}, {modules, [tcp_listener, tcp_echo_fsm]}, {registered, [tcp_server_sup, tcp_listener]}, {applications, [kernel, stdlib]}, %% %% mod: Specify the module name to start the application, plus args %% {mod, {tcp_server_app, []}}, {env, []} ] }.





Сборник

Создайте следующую структуру каталогов для этого приложения:

.

/tcp_server .

/tcp_server/ebin/ .

/tcp_server/ebin/tcp_server.app .

/tcp_server/src/tcp_server_app.erl .

/tcp_server/src/tcp_listener.erl .

/tcp_server/src/tcp_echo_fsm.erl



$ cd tcp_server/src $ for f in tcp*.

erl ; do erlc +debug_info -o .

/ebin $f



Запуск

Мы собираемся запустить оболочку Erlang с поддержкой SASL, чтобы видеть состояние процесса и отчеты об ошибках для нашего приложения.

Кроме того, мы собираемся запустить приложение приложениемон с целью наглядного изучения иерархии руководителей.



$ cd .

/ebin $ erl -boot start_sasl .

1> appmon:start().

{ok,<0.44.0>} 2> application:start(tcp_server).

ok

Теперь нажмите на кнопку tcp_server в приложении appmon для отображения иерархии супервизоров приложения.



3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]).

{ok,#Port<0.150>}

Мы только что инициировали новое клиентское соединение Echo Server.

4> gen_tcp:send(S,<<"hello">>).

ok 5> f(M), receive M -> M end. {tcp,#Port<0.150>,"hello"}

Мы убедились, что эхо-сервер работает должным образом.

Теперь попробуем «поставить» клиентское соединение на сервер и посмотрим, как генерируется сообщение об ошибке.



6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup).

[{undefined,<0.64.0>,worker,[]}] 7> exit(Pid,kill).

true =SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 === Supervisor: {local,tcp_client_sup} Context: child_terminated Reason: killed Offender: [{pid,<0.77.0>}, {name,undefined}, {mfa,{tcp_echo_fsm,start_link,[]}}, {restart_type,temporary}, {shutdown,2000}, {child_type,worker}]

Обратите внимание: если ваш сервер подвергается нагрузке из-за большого количества подключений, процесс прослушивания может не принимать новые подключения после определенного ограничения, установленного в операционной системе.

В этом случае вы увидите сообщение об ошибке:

"too many open files"



Заключение

OTP предоставляет стандартные блоки для создания неблокирующих TCP-серверов.

В этом руководстве показано, как создать простой сервер, используя стандартное поведение OTP. Теги: #erlang #web #trapexit #Erlang/OTP

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.