如何有效地使用erlang gen_server中的receive子句来解决超时错误?

cgyqldqp  于 2022-12-20  发布在  Erlang
关注(0)|答案(2)|浏览(190)

有时我的循环返回ok因为超时如何写这个代码在正确的方式.当有一个超时它只是返回ok,但不是我的实际值,我假设.在句柄调用我调用一个函数循环循环中的()()函数我正在接收一条带有receive子句的消息。现在,我使用loop2函数将此数据发送到数据库,无论数据是否已成功保存,都会从数据库返回响应,并将响应返回给循环但是如果有超时,我的循环函数返回ok,但不是实际值。

% @Author: ZEESHAN AHMAD
% @Date:   2020-12-22 05:06:12
% @Last Modified by:   ZEESHAN AHMAD
% @Last Modified time: 2021-01-10 04:42:59

-module(getAccDataCons).

-behaviour(gen_server).

-include_lib("deps/amqp_client/include/amqp_client.hrl").

-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3,
         terminate/2]).
-export([get_account/0]).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

stop() ->
    gen_server:cast(?MODULE, stop).

get_account() ->
    gen_server:call(?MODULE, {get_account}).

init(_Args) ->
    {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}),
    {ok, Channel} = amqp_connection:open_channel(Connection),
    {ok, Channel}.

handle_call({get_account}, _From, State) ->
    amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
    amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
    Binding =
        #'queue.bind'{exchange = <<"get">>,
                      routing_key = <<"get.account">>,
                      queue = <<"get_account">>},
    #'queue.bind_ok'{} = amqp_channel:call(State, Binding),
    io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
    amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
    Returned =loop(),
    io:format("~nReti=~p",[Returned]),
    {reply, Returned, State};
    

handle_call(Message, _From, State) ->
    io:format("received other handle_call message: ~p~n", [Message]),
    {reply, ok, State}.

handle_cast(stop, State) ->
    {stop, normal, State};
handle_cast(Message, State) ->
    io:format("received other handle_cast call : ~p~n", [Message]),
    {noreply, State}.

handle_info(Message, State) ->
    io:format("received handle_info message : ~p~n", [Message]),
    {noreply, State}.

code_change(_OldVer, State, _Extra) ->
    {ok, State}.

terminate(Reason, _State) ->
    io:format("server is terminating with reason :~p~n", [Reason]).

    loop()->
        receive
         #'basic.consume_ok'{} -> ok
        end,
       receive
            {#'basic.deliver'{}, Msg} ->
                #amqp_msg{payload = Payload} = Msg,
                Value=loop2(Payload),
        Value
    after 2000->
    io:format("Server timeout")
    end.

  loop2(Payload)->
            Result = jiffy:decode(Payload),
            {[{<<"account_id">>, AccountId}]} = Result,
            Doc = {[{<<"account_id">>, AccountId}]},
            getAccDataDb:create_AccountId_view(),
            Returned=case getAccDataDb:getAccountNameDetails(Doc) of
                success ->
                    Respo = getAccDataDb:getAccountNameDetails1(Doc),
                     Respo;
                details_not_matched ->
                    user_not_exist
            end,
            Returned.
66bbxpm5

66bbxpm51#

太长,无法编辑,我将其放入新答案中。

发生超时时接收ok的原因在loop()代码中,在第二个receive块中,2000 ms后,在io:format/1语句之后立即返回。
io:format返回ok,它是您在Returned变量中得到的。

loop()->
    ok = receive
        #'basic.consume_ok'{} -> ok
    end,
    receive
        {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> {ok,loop2(Payload)}
    after 2000 ->
        io:format("Server timeout"),
        {error,timeout}
    end.

使用此代码,您的客户端将收到{ok,Value}{error,timeout},并能够做出相应的React。
但这个版本仍然存在一些问题:-2秒超时可能太短,您错过了有效答案-由于您在接收块中使用模式匹配,并且没有检查每个amqp_channel:call的结果,因此可能会出现许多不同的问题,并显示为超时
首先让我们看看超时。有可能对amqp_channel的4次调用总共需要超过2秒才能成功完成。简单的解决方案是增加超时,将after 2000更改为after 3000或更多。但这样会有两个问题:

  • 在这段时间内,gen_server会被阻塞,如果它不是专用于单个客户端,则在等待响应期间,它将无法为任何其他请求提供服务。
  • 如果您需要将超时时间增加到5秒以上,您将遇到另一个超时,由gen_server内部管理:请求必须在少于5秒内被应答。

gen_server提供了一些接口函数来解决这类问题:'send_request','wait_response'和reply.下面是一个基本的gen_server,它可以处理3种请求:

  • stop...停止服务器,用于更新代码。
  • {blocking,Time,Value}服务器将在Time ms end期间休眠,然后返回Value。这模拟了您的情况,您可以调整获得答案所需的时间。
  • {non_blocking,Time,Value}服务器将把作业委派给另一个进程,并立即返回而不应答(因此它可用于另一个请求)。新进程将在时间(毫秒)结束期间休眠,然后使用gen_server:reply返回Value。

服务器模块实现多个用户界面:

  • 标准开始()、停止()
  • blocking(Time,Value)使用gen_server:call调用请求{blocking,Time,Value}的服务器
  • blocking_catch(Time,Value)与前一个相同,但捕获gen_server:call的结果以显示隐藏的超时
  • non_blocking(Time,Value,Wait):使用gen_server:send_request调用具有请求{non_blocking,Time,Value}的服务器,并等待响应最大等待毫秒数

最后它包括2个测试功能

  • test([Type,Time,Value,OptionalWait])它生成一个进程,该进程将发送一个类型为的请求,并带有相应的参数。应答被发送回调用进程。应答可以在shell中使用flush()检索。
  • parallel_test([Type,Time,NbRequests,OptionalWait])它使用相应的参数调用NbRequests次数测试。它收集所有的答案并使用本地函数collect(NbRequests,Timeout)打印它们。

代码如下

-module (server_test).

-behaviour(gen_server).

%% API
-export([start/0,stop/0,blocking/2,blocking_catch/2,non_blocking/3,test/1,parallel_test/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-define(SERVER, ?MODULE). 

%%%===================================================================
%%% API
%%%===================================================================
start() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

stop() ->
    gen_server:cast(?SERVER, stop).

blocking(Time,Value) ->
    gen_server:call(?SERVER, {blocking,Time,Value}).

blocking_catch(Time,Value) ->
    catch {ok,gen_server:call(?SERVER, {blocking,Time,Value})}.

non_blocking(Time,Value,Wait) ->
    ReqId = gen_server:send_request(?SERVER,{non_blocking,Time,Value}),
    gen_server:wait_response(ReqId,Wait).

test([Type,Time,Value]) -> test([Type,Time,Value,5000]);
test([Type,Time,Value,Wait]) ->
    Start = erlang:monotonic_time(),
    From = self(),
    F = fun() -> 
        R = case Type of 
            non_blocking -> ?MODULE:Type(Time,Value,Wait);
            _ -> ?MODULE:Type(Time,Value)
        end,
        From ! {request,Type,Time,Value,got_answer,R,after_microsec,erlang:monotonic_time() - Start} 
    end,
    spawn(F).

parallel_test([Type,Time,NbRequests]) -> parallel_test([Type,Time,NbRequests,5000]);
parallel_test([Type,Time,NbRequests,Wait]) ->
    case Type of
        non_blocking -> [server_test:test([Type,Time,X,Wait]) || X <- lists:seq(1,NbRequests)];
        _ -> [server_test:test([Type,Time,X]) || X <- lists:seq(1,NbRequests)]
    end,
    collect_answers(NbRequests,Time + 1000).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
    {ok, #{}}.

handle_call({blocking,Time,Value}, _From, State) ->
    timer:sleep(Time),
    Reply = {ok,Value},
    {reply, Reply, State};
handle_call({non_blocking,Time,Value}, From, State) ->
    F = fun() ->
        do_answer(From,Time,Value)
    end,
    spawn(F),
    {noreply, State};
handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.

handle_cast(stop, State) ->
    {stop,stopped, State};
handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(OldVsn, State, _Extra) ->
    io:format("changing code replacing version ~p~n",[OldVsn]),
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

do_answer(From,Time,Value) ->
    timer:sleep(Time),
    gen_server:reply(From, Value).

collect_answers(0,_Timeout) ->
    got_all_answers;
collect_answers(NbRequests,Timeout) ->
    receive 
        A -> io:format("~p~n",[A]),
            collect_answers(NbRequests - 1, Timeout)
    after Timeout ->
        missing_answers
    end.

shell中的会话:

44> c(server_test).                                    
{ok,server_test}
45> server_test:start().                               
{ok,<0.338.0>}
46> server_test:parallel_test([blocking,200,3]).
{request,blocking,200,1,got_answer,{ok,1},after_microsec,207872}
{request,blocking,200,2,got_answer,{ok,2},after_microsec,415743}
{request,blocking,200,3,got_answer,{ok,3},after_microsec,623615}
got_all_answers
47> % 3 blocking requests in parallel, each lasting 200ms, they are executed in sequence but no timemout is reached
47> % All the clients get their answers
47> server_test:parallel_test([blocking,2000,3]).                                                                                                       
{request,blocking,2000,1,got_answer,{ok,1},after_microsec,2063358}
{request,blocking,2000,2,got_answer,{ok,2},after_microsec,4127740}
missing_answers
48> % 3 blocking requests in parallel, each lasting 2000ms, they are executed in sequence and the last answer exceeds the gen_server timeout.       
48> % The client for this request don't receive answer. The client should also manage its own timeout to handle this case
48> server_test:parallel_test([blocking_catch,2000,3]).                                                                                             
{request,blocking_catch,2000,1,got_answer,{ok,1},after_microsec,2063358}
{request,blocking_catch,2000,2,got_answer,{ok,2},after_microsec,4127740}
{request,blocking_catch,2000,3,got_answer,
         {'EXIT',{timeout,{gen_server,call,[server_test,{blocking,2000,3}]}}},
         after_microsec,5135355}
got_all_answers
49> % same thing but catching the exception. After 5 seconds the gen_server call throws a timeout exception.
49> % The information can be forwarded to the client
49> server_test:parallel_test([non_blocking,200,3]).                                                       
{request,non_blocking,200,1,got_answer,{reply,1},after_microsec,207872}
{request,non_blocking,200,2,got_answer,{reply,2},after_microsec,207872}
{request,non_blocking,200,3,got_answer,{reply,3},after_microsec,207872}
got_all_answers
50> % using non blocking mechanism, we can see that all the requests were managed in parallel 
50> server_test:parallel_test([non_blocking,5100,3]).                                        
{request,non_blocking,5100,1,got_answer,timeout,after_microsec,5136379}
{request,non_blocking,5100,2,got_answer,timeout,after_microsec,5136379}
{request,non_blocking,5100,3,got_answer,timeout,after_microsec,5136379}
got_all_answers
51> % if we increase the answer delay above 5000ms, all requests fail in default timeout
51> server_test:parallel_test([non_blocking,5100,3,6000]).                              
{request,non_blocking,5100,1,got_answer,{reply,1},after_microsec,5231611}
{request,non_blocking,5100,2,got_answer,{reply,2},after_microsec,5231611}
{request,non_blocking,5100,3,got_answer,{reply,3},after_microsec,5231611}
got_all_answers
52> % but thanks to the send_request/wait_response/reply interfaces, the client can adjust the timeout to an accurate value
52> % for each request

请求无法完成的下一个原因是其中一个amqp_channel:调用失败。根据您想做的事情,有几种可能性:什么都不做、让崩溃、捕获异常或管理所有情况。下一个建议使用全局捕获

handle_call({get_account,Timeout}, From, State) ->
    F = fun() ->
        do_get_account(From,State,Timeout)
    end,
    spawn(F), % delegate the job to another process and free the server
    {noreply, State}; % I don't see any change of State in your code, this should be enough

...

do_get_account(From,State,Timeout) ->
    % this block of code asserts all positive return values from amqp_channel calls. it will catch any error
    % and return it as {error,...}. If everything goes well it return {ok,Answer}
    Reply = try
        ok = amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
        ok = amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
        Binding = #'queue.bind'{exchange = <<"get">>,
                                routing_key = <<"get.account">>,
                                queue = <<"get_account">>},
        #'queue.bind_ok'{} = amqp_channel:call(State, Binding),
        ok = amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
        {ok,wait_account_reply(Timeout)}
    catch
        Class:Exception -> {error,Class,Exception}
    end,
    gen_server:reply(From, Reply).

wait_account_reply(Timeout) ->
    receive
    % #'basic.consume_ok'{} -> ok % you do not handle this message, ignore it since it will be garbaged when the process die
        {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> extract_account(Payload)
    after Timeout->
       server_timeout
    end.

extract_account(Payload)->
        {[{<<"account_id">>, AccountId}]} = jiffy:decode(Payload),
        Doc = {[{<<"account_id">>, AccountId}]},
        getAccDataDb:create_AccountId_view(), % What is the effect of this function, what is the return value?
        case getAccDataDb:getAccountNameDetails(Doc) of
            success ->
                getAccDataDb:getAccountNameDetails1(Doc);
            details_not_matched ->
                user_not_exist
        end.

客户端应如下所示:

get_account() ->
    ReqId = gen_server:send_request(server_name,{get_account,2000}),
    gen_server:wait_response(ReqId,2200).
webghufk

webghufk2#

如果没有looploop2代码,很难给予答案,如果这两个函数中的一个检测到超时,您必须首先更改它们的行为以避免任何超时,或者将其增加到一个有效的值。如果超时是必要的,则确保返回值是显式的,例如{error,RequestRef,timeout}而不是ok
不过gen_server不应等待太长时间才得到答案,您可以修改代码,执行以下操作:
在客户端进程中不使用gen_server:call(ServerRef,Request),而可以用途:

RequestId = send_request(ServerRef, Request),
Result = wait_response(RequestId, Timeout),

删除loop和/或loop2中的超时,这样你就可以控制客户端的超时,甚至可以将其设置为无穷大(这不是一个好主意!)
也可以将函数拆分为两部分

gen_server:cast(ServerRef,{Request,RequestRef}), 
% this will not wait for any answer, RequestRef is a tag to identify later 
% if the request was fulfilled, you can use make_ref() to generate it

和更高版本,或在另一个客户端进程中(这需要至少将RequestRef传递到此进程)检查请求的结果:

Answer = gen_server:call(ServerRef,{get_answer,RequestRef}),
case Answer of
    no_reply -> ... % no answer yet
    {ok,Reply} -> ... % handle the answer
end,

最后,您必须修改循环代码以处理RequestRef,向服务器发回一条消息(再次使用gen_server:cast),其中包含结果和RequestRef,并将此结果存储在服务器状态中。
我不认为这第二个解决方案是有价值的,因为它或多或少与第一个相同,但手工制作,它让您管理许多错误情况(如客户端死亡),可能最终成为一种内存泄漏。

相关问题