我正在尝试连接kafka并使用erlang/ekaf生成一些消息。
该代码是ekaf的自述文件中解释的一个简单示例,但在 application:start
被称为。
请注意我用过 gen_icmp:ping
以确保此计算机可以访问运行kafka的服务器。
我还运行了python脚本,向这个kafka生成了一些随机消息,它是成功的,所以很可能在我的erlang代码中遗漏了一些东西
资料来源:
-module(kafka).
-compile(export_all).
run_test() ->
io:format("run_test: start.~n"),
pingKafka(),
try init_ekaf() of
_ -> io:format("run_test: ok~n")
catch
error:Msg -> io:format("run_test: error: ~p~n", [Msg]);
throw:Msg -> io:format("run_test: throw: ~p~n", [Msg]);
exit:Msg -> io:format("run_test: exit: ~p~n", [Msg])
end.
init_ekaf() ->
io:format("init_ekaf: start.~n"),
application:load(ekaf),
application:set_env(ekaf, ekaf_bootstrap_broker, {"kafka.dev", 9092}),
ok = application:start(ekaf),
io:format("init_ekaf: started.~n"),
Topic = <<"foobar">>,
ekaf:produce_sync(Topic, <<"some data">>),
io:format("init_ekaf: message sent.~n"),
ok.
pingKafka() ->
Res = gen_icmp:ping("kafka.dev"),
io:format("pingKafka: ~p.~n", [Res]),
ok.
输出:
run_test: start.
pingKafka: [{ok,"kafka.dev",
{192,168,0,51},
{192,168,0,51},
{12343,0,64,130},
<<" !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJK">>}].
init_ekaf: start.
run_test: error: {badmatch,{error,{not_started,gproc}}}
run_test: end.
1条答案
按热度按时间olmpazwi1#
在再次阅读存储库中现有的测试之后,我发现
gproc
在启动ekaf之前,还需要启动应用程序。所以通过添加:
就在之前
application:start(ekaf)
,问题解决了。p、 s:找到了另一个解决问题的方法
application:ensure_all_started(ekaf)
而不是application:start(ekaf)
. 如文件所述,ensure_all_started
相当于对应用程序尚未启动的所有依赖项重复调用start/1,2