Erlang / Golang端口示例中的缓冲区大小

xjreopfe  于 2022-12-08  发布在  Erlang
关注(0)|答案(3)|浏览(190)

I have a crude Erlang-to-Golang port example, passing data from Erlang to Golang and echoing the response.
Problem is the amount of data I can transfer seems to be limited to 2^8 bytes (see below). I thought the problem was probably on the Golang side (not creating a big enough buffer) but replacing bufio.NewReader with bufio.NewReaderSize didn't work. So am now thinking the problem is maybe on the Erlang side.
What do I need to do to increase the buffer size / be able to echo a message larger than 2^8 bytes ?
TIA

justin@justin-ThinkPad-X240:~/work/erlang_golang_port$ erl -pa ebin
Erlang/OTP 17 [erts-6.4.1] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]

Eshell V6.4.1  (abort with ^G)
1> port:start("./echo").
<0.35.0>
2> port:ping(65000).
65000
3> port:ping(66000).
** exception error: bad argument
     in function  port:call_port/1 (port.erl, line 20)
4> port:start("./echo").
<0.40.0>
5> port:ping(66000).    
65536

Go

package main

import (
    "bufio"
    "os"
)

const Delimiter = '\n'

func main() {
    // reader := bufio:NewReader(os.Stdin)
    reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
    bytes, _ := reader.ReadBytes(Delimiter)
    os.Stdout.Write(bytes[:len(bytes)-1])
}

Erlang

-module(port).

-export([start/1, stop/0, init/1]).

-export([ping/1]).

-define(DELIMITER, [10]).

start(ExtPrg) ->
    spawn(?MODULE, init, [ExtPrg]).

stop() ->
    myname ! stop.

ping(N) ->
    Msg=[round(65+26*random:uniform()) || _ <- lists:seq(1, N)],
    call_port(Msg).

call_port(Msg) ->
    myname ! {call, self(), Msg},
    receive
    {myname, Result} ->
        length(Result)
    end.

init(ExtPrg) ->
    register(myname, self()),
    process_flag(trap_exit, true),
    Port = open_port({spawn, ExtPrg}, []),
    loop(Port).

loop(Port) ->
    receive
    {call, Caller, Msg} ->
        Port ! {self(), {command, Msg++?DELIMITER}},
        receive
        {Port, {data, Data}} ->
            Caller ! {myname, Data}
        end,
        loop(Port);
    stop ->
        Port ! {self(), close},
        receive
        {Port, closed} ->
            exit(normal)
        end;
    {'EXIT', Port, _Reason} ->
        exit(port_terminated)
    end.
6g8kf2rb

6g8kf2rb1#

如果使用start_link,您将看到端口在第一个命令之后崩溃:

1> port:start('go run port.go').
<0.118.0>
2> port:ping(65000).
65000
** exception error: port_terminated

如果你把Go代码改为在循环中运行,就可以避免这种崩溃:

func main() {
    for {
        // reader := bufio:NewReader(os.Stdin)
        reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
        bytes, _ := reader.ReadBytes(Delimiter)
        os.Stdout.Write(bytes[:len(bytes)-1])
    }
}

现在我们可以看到另一个有趣的结果:

33> c(port).
{ok,port}
40> port:ping(66000).
65536
41> port:ping(66000).
464
42> port:ping(66000).
65536
43> port:ping(66000).
464

现在我们可以看到实际上没有数据丢失,数据只是在端口中缓冲。由于您没有指定帧协议(使用{packet, N}{line, N}),您自己负责收集数据。Erlang端口的内部缓冲区大小似乎是64K(尽管我没有找到相关文档,也没有办法更改它)。
如果你改变你的receive以在返回之前获取所有数据,你将每一次每一个字节:

loop(Port) ->
    receive
    {call, Caller, Msg} ->
        Port ! {self(), {command, Msg++?DELIMITER}},
        Caller ! {myname, receive_all(Port, 10)},
        loop(Port);
    stop ->
        Port ! {self(), close},
        receive
        {Port, closed} ->
            exit(normal)
        end;
    {'EXIT', Port, _Reason} ->
        exit(port_terminated)
    end.

receive_all(Port, Timeout) -> receive_all(Port, Timeout, []).

receive_all(Port, Timeout, Data) ->
    receive
    {Port, {data, New}} ->
        receive_all(Port, Timeout, [New|Data])
    after Timeout ->
        lists:flatten(lists:reverse(Data))
    end.

运行该程序,我们得到:

1> c(port).
{ok,port}
2>
3> port:start('go run port.go').
<0.311.0>
4> port:ping(66000).
66000
5> port:ping(66000).
66000
6> port:ping(66000).
66000
3yhwsihp

3yhwsihp2#

  1. 2^8 is 256, not 65536 which is 2^16 (or 2 bytes).
  2. For excluding golang program you can simply replace your echo with GNU cat
  3. Default message max size for port communication is 64k, so when your port receives messages, the first one is leading 64k of the string. You can read port again to gain remaining data but you just drop them in your code.
  4. If you really want to communicate on line-based protocol you should configure your port accordingly:
    {line, L}
    Messages are delivered on a per line basis. Each line (delimited by the OS-dependent newline sequence) is delivered in one single message. The message data format is {Flag, Line}, where Flag is either eol or noeol and Line is the actual data delivered (without the newline sequence).
    L specifies the maximum line length in bytes. Lines longer than this will be delivered in more than one message, with the Flag set to noeol for all but the last message. If end of file is encountered anywhere else than immediately following a newline sequence, the last line will also be delivered with the Flag set to noeol. In all other cases, lines are delivered with Flag set to eol.
    The {packet, N} and {line, L} settings are mutually exclusive.
    So your code would be
Port = open_port({spawn, ExtPrg}, [{line, ?PACKET_SIZE]),
%%...
{call, Caller, Msg} ->
    Port ! {self(), {command, Msg++?DELIMITER}},
    D = read_data(Port, []),
    Caller ! {myname, D},
    loop(Port);
%%...
read_data(Port, Prefix) ->
receive
    {Port, {data, {noeol, Data}}} ->
        read_data(Port, Prefix ++ Data);
    {Port, {data, {eol, Data}}} ->
        Prefix ++ Data
end.
iibxawm4

iibxawm43#

我一直在努力解决类似的问题。这里是管道模块的完整代码。
它允许向端口发送文本数据并读取所有回复。

-module(apr_pipe).

-export([open_pipe/2,send/2,close/1]).

-export([loop/1,status/1,init/1]).

-include_lib("kernel/include/logger.hrl").

-define(MAX_LINE_LEN,4096).

open_pipe(Path,Cmd) ->
   State = #{path => Path, cmd => Cmd},
    Pid = spawn(?MODULE,init,[State]),
    Pid.

init(State) ->
    #{path := Path,cmd := Cmd} = State,
    FullFn = filename:join(Path,Cmd),
    Settings = [{line,?MAX_LINE_LEN},use_stdio,stderr_to_stdout,hide,binary,exit_status],
    Port = erlang:open_port({spawn_executable,FullFn},Settings),
    State2 = State#{port => Port, data => #{}},
    loop(State2).

send(Pid,Data)  -> Pid!{self(),send,Data}.
close(Pid)      -> Pid!{self(),send,close}.
status(Pid)     -> Pid!{self(),status}.

get_eol() -> <<"\n">>.

loop(State) ->
    receive
        {_Pid,send,close} -> 
                    ?LOG(notice,"got cmd: Close",[]),
                    Port = maps:get(port,State),
                    port_close(Port),
                    exit(normal);
        {Pid,send,Data} ->
                    ?LOG(notice,"Send Data ...",[]),
                    Port = maps:get(port,State),
                    port_command(Port,Data),
                    port_command(Port,get_eol()),
                    State2 = State#{status => data_sent, client => Pid},
                    loop(State2);
         {Pid,status} -> 
                    Port = maps:get(port,State),
                    ?LOG(notice,"Status: Port: ~p State: ~p",[Port,State]),
                    Pid!{status,Port,State},
                    loop(State);
        % port messages.
        {Port, {data,{noeol,Data}}} ->
                ?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
                CurData = maps:get(cur_data,State,[]),
                State2 = State#{cur_data => [Data | CurData]},
                loop(State2);

        {Port, {data, {eol,Data}}} ->
                ?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
                CurData = [Data | maps:get(cur_data,State,[])],
                CurData2 = lists:reverse(CurData),
                Reply    = list_to_binary(CurData2),
                Client = maps:get(client,State,undefined),
                State2 = State#{cur_data => [], client => undefined},
                case Client of
                    undefined -> ?LOG(error,"can not sent reply. Client: ~p Reply: ~p", [Client,Reply]),
                                 loop(State2);
                    _ -> Client!{reply,Reply},
                         loop(State2)
                 end;
        {_Port, closed} ->
                ?LOG(warning, "Port: ~p closed",[]),
                exit(normal);
        {'EXIT',  Port, Reason} ->
                 ?LOG(notice,"Port: ~p exit. Reason: ~p",[Port,Reason]),
                 exit(Reason);
        _Other -> ?LOG(error,"unexpected message: ~p",[_Other]),
                  exit({error,{unexpected_message,_Other}})
    end.

相关问题