将数据从apachekafka写入文本文件时出现的问题

0ve6wy6x  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(374)

我的开发环境设置:windows 10 enterprise edition,16gb ram,2.81ghz 64位操作系统。我安装了virtualbox并在其中导入了ubuntu映像。在ubuntu中,我安装了confluent clihttps://github.com/confluentinc/confluent-cli 经营Kafka、Zookeeper等服务。
场景:我想将apachekafka主题中的数据写入文本文件。我正在使用接收器连接器并遵循下面的链接来完成此任务。而且,我没有编写任何代码来实现同样的功能。
使用此链接完成我的任务
http://bigdatums.net/2017/06/22/writing-data-from-apache-kafka-to-text-file/
迄今为止成功完成的步骤:
能够在虚拟机中运行ubuntu映像。
能够运行confluent cli。
能够使用bin/confluent start命令启动合流kafka、zookeeper和其他服务。
能够在confluent cli中创建主题
尝试运行以下消息以阅读来自Kafka主题的文本消息

osboxes@osboxes:~/ganesh/confluent-5.1.0$ bin/connect-standalone /home/osboxes/ganesh/confluent-5.1.0/etc/kafka/csx-connect-standalone.properties /home/osboxes/ganesh/confluent-5.1.0/etc/kafka/csx-connect-file-sink.properties

属性配置详细信息如下

connect-file-sink.properties (details)    
name=local-file-sink    
connector.class=FileStreamSink    
tasks.max=1    
file=/home/osboxes/ganesh/ptc/messages/output/trainstartevent/MBCDTSKB02.json    
topics=TrainStartEvent  
connect-file-source.properties (details)    
name=local-file-source    
connector.class=FileStreamSource    
tasks.max=1    
file=/home/osboxes/ganesh/ptc/messages/input/trainstartevent/MBCDTSKB02.json    
topic=TrainStartEvent    
connect-standalone.properties (details)    
key.converter=org.apache.kafka.connect.storage.StringConverter    
value.converter=org.apache.kafka.connect.storage.StringConverter    
internal.key.converter=org.apache.kafka.connect.json.JsonConverter    
internal.value.converter=org.apache.kafka.connect.json.JsonConverter    
internal.key.converter.schemas.enable=false    
internal.value.converter.schemas.enable=false    
offset.flush.interval.ms=10000 
plugin.path=share/java

实际错误消息

[2019-01-20 21:14:17,413] INFO Started
o.e.j.s.ServletContextHandler@546394ed{/,null,AVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler:850)
[2019-01-20 21:14:17,428] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
org.apache.kafka.connect.errors.ConnectException: Unable to start REST
server

[2019-01-20 21:13:19,927] INFO Kafka Connect standalone worker
initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-20 21:13:20,021] INFO WorkerInfo values:
jvm.args = -Xms256M, -Xmx2G, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20,
-XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=bin/../logs, -Dlog4j.configuration=file:bin/../etc/kafka/connect-log4j.properties
jvm.spec = Oracle Corporation, Java HotSpot(TM) 64-Bit Server VM,
1.8.0_201, 25.201-b09
[2019-01-20 21:14:13,427] WARN The configuration 'plugin.path' was
supplied but isn't a known config.
(org.apache.kafka.clients.admin.AdminClientConfig:287)
[2019-01-20 21:14:13,431] WARN The configuration 'value.converter' was
supplied but isn't a known config.
(org.apache.kafka.clients.admin.AdminClientConfig:287)
[2019-01-20 21:14:13,431] WARN The configuration
'internal.key.converter.schemas.enable' was supplied but isn't a known
config. (org.apache.kafka.clients.admin.AdminClientConfig:287)
[2019-01-20 21:14:13,432] WARN The configuration 'key.converter' was
supplied but isn't a known config.
(org.apache.kafka.clients.admin.AdminClientConfig:287)
[2019-01-20 21:14:13,433] INFO Kafka version : 2.1.0-cp1
(org.apache.kafka.common.utils.AppInfoParser:109)
[2019-01-20 21:14:13,433] INFO Kafka commitId : 3bce825d5f759863
(org.apache.kafka.common.utils.AppInfoParser:110)
[2019-01-20 21:14:14,047] INFO Kafka cluster ID:
jPHHwv39Riyn1krFQyhYkA (org.apache.kafka.connect.util.ConnectUtils:59)
[2019-01-20 21:14:14,139] INFO Logging initialized @55198ms to
org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193)
[2019-01-20 21:14:14,565] INFO Added connector for http://:8083
(org.apache.kafka.connect.runtime.rest.RestServer:119)
[2019-01-20 21:14:14,681] INFO Advertised URI: http://127.0.1.1:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:267)
[2019-01-20 21:14:14,705] INFO Kafka version : 2.1.0-cp1
(org.apache.kafka.common.utils.AppInfoParser:109)
[2019-01-20 21:14:14,705] INFO Kafka commitId : 3bce825d5f759863
(org.apache.kafka.common.utils.AppInfoParser:110)
[2019-01-20 21:14:15,228] INFO JsonConverterConfig values:
converter.type = key
schemas.cache.size = 1000
schemas.enable = false
(org.apache.kafka.connect.json.JsonConverterConfig:279)
[2019-01-20 21:14:15,238] INFO JsonConverterConfig values:
converter.type = value
schemas.cache.size = 1000
schemas.enable = false
(org.apache.kafka.connect.json.JsonConverterConfig:279)
[2019-01-20 21:14:15,251] INFO Kafka Connect standalone worker
initialization took 55315ms
(org.apache.kafka.connect.cli.ConnectStandalone:92)
[2019-01-20 21:14:15,251] INFO Kafka Connect starting
(org.apache.kafka.connect.runtime.Connect:49)
[2019-01-20 21:14:15,256] INFO Herder starting
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:88)
[2019-01-20 21:14:15,256] INFO Worker starting
(org.apache.kafka.connect.runtime.Worker:172)
[2019-01-20 21:14:15,256] INFO Starting FileOffsetBackingStore with
file /tmp/connect.offsets
(org.apache.kafka.connect.storage.FileOffsetBackingStore:58)
[2019-01-20 21:14:15,258] INFO Worker started
(org.apache.kafka.connect.runtime.Worker:177)
[2019-01-20 21:14:15,259] INFO Herder started
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:90)
[2019-01-20 21:14:15,259] INFO Starting REST server
(org.apache.kafka.connect.runtime.rest.RestServer:163)
[2019-01-20 21:14:15,565] INFO jetty-9.4.12.v20180830; built:
2018-08-30T13:59:14.071Z; git:
27208684755d94a92186989f695db2d7b21ebc51; jvm 1.8.0_201-b09
(org.eclipse.jetty.server.Server:371)
[2019-01-20 21:14:15,733] INFO DefaultSessionIdManager
workerName=node0 (org.eclipse.jetty.server.session:365)
[2019-01-20 21:14:15,746] INFO No SessionScavenger set, using defaults
(org.eclipse.jetty.server.session:370)
[2019-01-20 21:14:15,748] INFO node0 Scavenging every 600000ms
(org.eclipse.jetty.server.session:149)
Jan 20, 2019 9:14:16 PM org.glassfish.jersey.internal.inject.Providers
checkProviderRuntime
WARNING: A provider
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
registered in SERVER runtime does not implement any provider
interfaces applicable in the SERVER runtime. Due to constraint
configuration problems the provider
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
will be ignored.
Jan 20, 2019 9:14:16 PM org.glassfish.jersey.internal.inject.Providers
checkProviderRuntime
WARNING: A provider
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
registered in SERVER runtime does not implement any provider
interfaces applicable in the SERVER runtime. Due to constraint
configuration problems the provider
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
will be ignored.
Jan 20, 2019 9:14:16 PM org.glassfish.jersey.internal.inject.Providers
checkProviderRuntime
WARNING: A provider
org.apache.kafka.connect.runtime.rest.resources.RootResource
registered in SERVER runtime does not implement any provider
interfaces applicable in the SERVER runtime. Due to constraint
configuration problems the provider
org.apache.kafka.connect.runtime.rest.resources.RootResource will be
ignored.
Jan 20, 2019 9:14:17 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The
(sub)resource method listConnectors in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
contains empty path annotation.
WARNING: The (sub)resource method createConnector in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
contains empty path annotation.
WARNING: The (sub)resource method serverInfo in
org.apache.kafka.connect.runtime.rest.resources.RootResource contains
empty path annotation.
[2019-01-20 21:14:17,413] INFO Started
o.e.j.s.ServletContextHandler@546394ed{/,null,AVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler:850)
[2019-01-20 21:14:17,428] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
org.apache.kafka.connect.errors.ConnectException: Unable to start REST
server
at
org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:214)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:53)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:95)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at
org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:339)
at
org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:307)
at
org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
at
org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:235)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.server.Server.doStart(Server.java:395)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at
org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:212)
... 2 more
[2019-01-20 21:14:17,437] INFO Kafka Connect stopping
(org.apache.kafka.connect.runtime.Connect:65)
[2019-01-20 21:14:17,437] INFO Stopping REST server
(org.apache.kafka.connect.runtime.rest.RestServer:223)
[2019-01-20 21:14:17,442] INFO Stopped
http_8083@1b90fee4{HTTP/1.1,[http/1.1]}{0.0.0.0:8083}
(org.eclipse.jetty.server.AbstractConnector:341)
[2019-01-20 21:14:17,460] INFO node0 Stopped scavenging
(org.eclipse.jetty.server.session:167)
[2019-01-20 21:14:17,493] INFO Stopped
o.e.j.s.ServletContextHandler@546394ed{/,null,UNAVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler:1040)
[2019-01-20 21:14:17,507] INFO REST server stopped
(org.apache.kafka.connect.runtime.rest.RestServer:241)
[2019-01-20 21:14:17,508] INFO Herder stopping
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:95)
[2019-01-20 21:14:17,509] INFO Worker stopping
(org.apache.kafka.connect.runtime.Worker:184)
[2019-01-20 21:14:17,510] INFO Stopped FileOffsetBackingStore
(org.apache.kafka.connect.storage.FileOffsetBackingStore:66)
[2019-01-20 21:14:17,522] INFO Worker stopped
(org.apache.kafka.connect.runtime.Worker:205)
[2019-01-20 21:14:17,523] INFO Herder stopped
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:112)
[2019-01-20 21:14:17,529] INFO Kafka Connect stopped
(org.apache.kafka.connect.runtime.Connect:70)
6fe3ivhb

6fe3ivhb1#

原因:java.net.bindexception:地址已在使用中
听起来你跑了 confluent start 因此,kafka connect服务器已经在端口8083上运行。
因此,您需要使用 confluent load /home/osboxes/ganesh/confluent-5.1.0/etc/kafka/csx-connect-file-sink.properties ,或将属性文件转换为json,然后可以执行以下操作 curl -XPOST -d@csx-connect-file-sink.json http://localhost:8083 请参见kafka connect rest api
请注意,要写入文件,还可以从控制台使用者执行所有操作 kafka-console-consumer --from-beginning --property print.key=true --topic x --bootstrap-server localhost:9092 --group to-file >> /tmp/file.txt

相关问题