在接收器连接器上反序列化avro消息时出错

ct2axkht  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(373)

在尝试使用来自主题的avro消息并将其转储到postgres db时,反序列化avro消息时出错。
这是我的制作人配置:

spring
  Kafka
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema.registry.url: http://localhost:8081

接收器连接器配置:

{
"name": "jdbc_source_postgres_avro",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgres://localhost:5432/kafka-test",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "connection.user": "postgres",
    "connection.password": "password",
    "topics": "docker-avro-topic",
    "auto.create": "true",
    "auto.offset.reset": "latest",
    "name": "jdbc_source_postgres_avro"
}

连接的docker映像:

connect:
image: cnfldemos/kafka-connect-datagen:0.1.3-5.3.1
hostname: connect
container_name: connect
depends_on:
  - zookeeper
  - broker
  - schema-registry
ports:
  - "8083:8083"
environment:
  CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
  CONNECT_REST_ADVERTISED_HOST_NAME: connect
  CONNECT_REST_PORT: 8083
  CONNECT_GROUP_ID: compose-connect-group
  CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
  CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
  CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
  CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
  CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

最后是错误日志:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
in error handler    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.kafka.connect.errors.DataException: Failed to deserialize
data for topic docker-avro-topic to Avro:   at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)...
13 more Caused by:

**org.apache.kafka.common.errors.SerializationException: Error

deserializing Avro message for id 101 Caused by:
java.net.ConnectException: Connection refused (Connection refused)  at
java.net.PlainSocketImpl.socketConnect(Native Method)**   at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)   at
java.net.Socket.connect(Socket.java:589)    at
sun.net.NetworkClient.doConnect(NetworkClient.java:175)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:463)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:558)     at
sun.net.www.http.HttpClient.<init>(HttpClient.java:242)     at
sun.net.www.http.HttpClient.New(HttpClient.java:339)    at
sun.net.www.http.HttpClient.New(HttpClient.java:357     at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
    at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
    at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
    at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
    at
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
    at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
    at
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
uxhixvfz

uxhixvfz1#

您已将kafka connect配置为查找在同一容器上运行的架构注册表:

"value.converter.schema.registry.url": "http://localhost:8081",

但事实并非如此,因此:

java.net.ConnectException: Connection refused (Connection refused)

既然您已经在kafka connect docker环境变量中指定了schema registry config,那么假设您确实在一个可以解析为 schema-registry 从kafka connect容器中,只需删除 schema.registry.url 实际连接器配置的配置行。

相关问题