我在分布式模式下使用kafka connect(spooldir连接器)。启动worker之后,当我发送restapi请求时,会收到一个带有eofexception的响应。我检查了连接器在独立模式下是否正常工作。
http响应:
{"error_code":500,"message":"IO Error trying to forward REST request:
java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}
>HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null
[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]"}
服务器日志显示以下错误:
ERROR IO error forwarding REST request: (org.apache.kafka.connect.runtime.rest.RestClient:143)
java.util.concurrent.ExecutionException: java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}->HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null)[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null)[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]
at org.eclipse.jetty.client.util.FutureResponseListener.getResult(FutureResponseListener.java:118)
at org.eclipse.jetty.client.util.FutureResponseListener.get(FutureResponseListener.java:101)
at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:711)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:125)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:65)
at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:369)
at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:164)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
...
以下是属性文件:
bootstrap.servers=kaas-test:443
security.protocol=SSL
ssl.truststore.location=<truststore-location>
ssl.truststore.password=<truststore-password>
ssl.keystore.location=<keystore-location>
ssl.keystore.password=<keystore-password>
ssl.key.password=<key-password>
producer.security.protocol=SSL
producer.ssl.truststore.location=<truststore-location>
producer.ssl.keystore.location=<keystore-location>
producer.ssl.truststore.password=<truststore-password>
producer.ssl.keystore.password=<keystore-password>
producer.ssl.key.password=<key-password>
group.id=tenant
schema.generation.enabled=true
schema.generation.value.name=schemavalue
schema.generation.key.name=schemakey
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
rest.port=28082
rest.hostname=localhost
plugin.path=connect-jars/confluentinc-connect-transforms-1.3.2/,connect-jars/jcustenborder-kafka-connect-spooldir-2.0.46/
我该怎么解决?
暂无答案!
目前还没有任何答案,快来回答吧!