转发rest请求时出错

9q78igpj  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(350)

我在分布式模式下使用kafka connect(spooldir连接器)。启动worker之后,当我发送restapi请求时,会收到一个带有eofexception的响应。我检查了连接器在独立模式下是否正常工作。
http响应:

{"error_code":500,"message":"IO Error trying to forward REST request:
java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}
>HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null
[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]"}

服务器日志显示以下错误:

ERROR IO error forwarding REST request:  (org.apache.kafka.connect.runtime.rest.RestClient:143)
java.util.concurrent.ExecutionException: java.io.EOFException: HttpConnectionOverHTTP@7801635a::SocketChannelEndPoint@45ad3c42{broker/168.183.37.116:8083<->/10.96.134.132:65453,ISHUT,fill=-,flush=-,to=522046/0}{io=0/0,kio=0,kro=1}->HttpConnectionOverHTTP@7801635a(l:/10.96.134.132:65453 <-> r:broker/168.183.37.116:8083,closed=false)=>HttpChannelOverHTTP@2991a896(exchange=HttpExchange@45b72f1c req=TERMINATED/null@null res=PENDING/null@null)[send=HttpSenderOverHTTP@1b783029(req=QUEUED,snd=COMPLETED,failure=null)[HttpGenerator@379b5028{s=START}],recv=HttpReceiverOverHTTP@13ea8b55(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0 of -1}]]
        at org.eclipse.jetty.client.util.FutureResponseListener.getResult(FutureResponseListener.java:118)
        at org.eclipse.jetty.client.util.FutureResponseListener.get(FutureResponseListener.java:101)
        at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:711)
        at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:125)
        at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:65)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:369)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:164)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        ...

以下是属性文件:

bootstrap.servers=kaas-test:443
security.protocol=SSL
ssl.truststore.location=<truststore-location>
ssl.truststore.password=<truststore-password>
ssl.keystore.location=<keystore-location>
ssl.keystore.password=<keystore-password>
ssl.key.password=<key-password>

producer.security.protocol=SSL
producer.ssl.truststore.location=<truststore-location>
producer.ssl.keystore.location=<keystore-location>
producer.ssl.truststore.password=<truststore-password>
producer.ssl.keystore.password=<keystore-password>
producer.ssl.key.password=<key-password>

group.id=tenant

schema.generation.enabled=true
schema.generation.value.name=schemavalue
schema.generation.key.name=schemakey

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000

rest.port=28082
rest.hostname=localhost

plugin.path=connect-jars/confluentinc-connect-transforms-1.3.2/,connect-jars/jcustenborder-kafka-connect-spooldir-2.0.46/

我该怎么解决?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题