Kafka 在Confluent Control Center UI中生成Avro消息

mzsu5hc0  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(140)

为了开发一个数据传输应用程序,我需要首先定义一个键/值avro模式。在定义avro模式之前,生产者应用程序尚未开发。
我克隆了一个主题和它的键/值avro模式,这些模式已经在工作,并且还克隆了jdbcsnink连接器。我只是更改了主题和连接器名称。
然后我复制和现有的消息成功发送接收器使用Confluent主题消息UI生产器。x1c 0d1x的数据
但它正在发送错误:“未知魔法字节!“

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:323)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:164)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        ... 17 more
[2022-07-25 03:45:42,385] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)

字符串
阅读其他问题看,似乎必须使用模式序列化消息。
Unknown magic byte with kafka-avro-console-consumer
是否可以使用Confluent Topic UI向具有AVRO键/值模式的主题发送消息?
avro模式是否需要依赖于连接器/源的信息?还是命名空间取决于主题名称?
这是我的关键模式。主题名称为knov_03

{
  "connect.name": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming.Key",
  "fields": [
    {
      "name": "id_sap_incoming",
      "type": "long"
    }
  ],
  "name": "Key",
  "namespace": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming",
  "type": "record"
}



连接器:

{
  "name": "knov_05",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "knov_03",
    "connection.url": "jdbc:mysql://eXXXXX:3306/MY_DB_SCHEMA?useSSL=FALSE&nullCatalogMeansCurrent=true",
    "connection.user": "USER",
    "connection.password": "PASSWORD",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id_sap_incoming",
    "auto.create": "true",
    "auto.evolve": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

  • 谢谢-谢谢
7uhlpewt

7uhlpewt1#

截至Confluent Control Center的当前版本,它不支持直接通过其UI发送Avro格式的消息。控制中心主要用于管理和监控Kafka环境。
为了生成Avro消息,您通常会使用Confluent提供的其他工具,例如Confluent REST Proxy或Kafka Avro Console Producer。这两个工具都允许您将Avro消息发送到Kafka,但它们要求您的Avro模式在Confluent Schema Registry中注册。

相关问题