为了开发一个数据传输应用程序,我需要首先定义一个键/值avro模式。在定义avro模式之前,生产者应用程序尚未开发。
我克隆了一个主题和它的键/值avro模式,这些模式已经在工作,并且还克隆了jdbcsnink连接器。我只是更改了主题和连接器名称。
然后我复制和现有的消息成功发送接收器使用Confluent主题消息UI生产器。x1c 0d1x的数据
但它正在发送错误:“未知魔法字节!“
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:323)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:164)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
... 17 more
[2022-07-25 03:45:42,385] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
字符串
阅读其他问题看,似乎必须使用模式序列化消息。
Unknown magic byte with kafka-avro-console-consumer
是否可以使用Confluent Topic UI向具有AVRO键/值模式的主题发送消息?
avro模式是否需要依赖于连接器/源的信息?还是命名空间取决于主题名称?
这是我的关键模式。主题名称为knov_03
{
"connect.name": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming.Key",
"fields": [
{
"name": "id_sap_incoming",
"type": "long"
}
],
"name": "Key",
"namespace": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming",
"type": "record"
}
型
的
连接器:
{
"name": "knov_05",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "knov_03",
"connection.url": "jdbc:mysql://eXXXXX:3306/MY_DB_SCHEMA?useSSL=FALSE&nullCatalogMeansCurrent=true",
"connection.user": "USER",
"connection.password": "PASSWORD",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id_sap_incoming",
"auto.create": "true",
"auto.evolve": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schema.registry.url": "http://schema-registry:8081"
}
}
型
- 谢谢-谢谢
1条答案
按热度按时间7uhlpewt1#
截至Confluent Control Center的当前版本,它不支持直接通过其UI发送Avro格式的消息。控制中心主要用于管理和监控Kafka环境。
为了生成Avro消息,您通常会使用Confluent提供的其他工具,例如Confluent REST Proxy或Kafka Avro Console Producer。这两个工具都允许您将Avro消息发送到Kafka,但它们要求您的Avro模式在Confluent Schema Registry中注册。