反序列化avro消息

hi3rlvi2  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(669)

我从这里部署了Kafka。我还添加了 docker-compose.yml postgres容器如下:

postgres:
    image: postgres
    hostname: kafka-postgres
    container_name: kafka-postgres
    depends_on:
      - ksql-server
      - broker
      - schema-registry
      - connect
    ports:
      - 5432:5432

创建了主题页面视图。
此外,我用设置创建了datagenconnector并运行了它。

{
  "name": "datagen-pageviews",
  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "kafka.topic": "pageviews",
  "max.interval": "100",
  "iterations": "999999999",
  "quickstart": "pageviews"
}

据我所知,连接器为主题定义了一个模式:

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "pageid",
      "type": "string"
    }
  ],
  "connect.name": "ksql.pageviews"
}

我的下一步是创建jdbcsinkconnector,它将数据从kafka主题传输到postgres表。成功了。连接器的设置:

{
  "name": "from-kafka-to-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "pageviews"
  ],
  "connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
  "connection.user": "postgres",
  "connection.password": "********",
  "auto.create": "true",
  "auto.evolve": "true"
}

然后我试着自己给这个主题发信息。但失败,出现错误:
[2020-02-01 21:16:11750]任务to-pg-0中遇到错误。正在使用类“io.confluent.connect.avro.avroconverter”执行阶段“value\u converter”,其中消耗的记录为{topic='pageviews',partition=0,offset=23834,timestamp=1580591160374,timestamptype=createtime}(org.apache.kafka.connect.runtime.errors.logreporter)org.apache.kafka.connect.errors.dataexception:未能将主题页面视图的数据反序列化到avro:at io.confluent.connect.avro.avroconverter.toconnectdata(avroconverter)。java:110)位于org.apache.kafka.connect.runtime.workersinktask.lambda$convertandtransformrecord$1(workersinktask)。java:487) 位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandretry(retrywithtoleranceoperator)。java:128)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandhandleerror(retrywithtoleranceoperator)。java:162)在org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execute(retrywithtoleranceoperator)。java:104)位于org.apache.kafka.connect.runtime.workersinktask.convertandtransformrecord(workersinktask)。java:487)位于org.apache.kafka.connect.runtime.workersinktask.convertmessages(workersinktask)。java:464)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:320)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:224)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:192)在org.apache.kafka.connect.runtime.workertask.dorun(workertask。java:177)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:227)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)原因:org.apache.kafka.common.errors.serializationexception:反序列化id-1的avro消息时出错原因:org.apache.kafka.common.errors.serializationexception:未知的魔法字节!
所以send方法很重要。我就是这样做的(python,合流kafka python):

producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
   'viewtime': 123,
   'userid': 'user_1',
   'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()

也许我应该提供一个带有消息的模式(avroproducer)?

htzpubme

htzpubme1#

出现问题的原因是您试图使用avro转换器从非avro主题读取数据。
有两种可能的解决方案:
1切换kafka connect的接收器连接器以使用正确的转换器
例如,如果您正在将来自kafka主题的json数据消费到kafka连接接收器中:

...
value.converter=org.apache.kafka.connect.json.JsonConverter. 
value.converter.schemas.enable=true/false
...
``` `value.converter.schemas.enable` 取决于消息是否包含架构。。
2将上游格式切换为avro
用于datagenconnector生成消息到kafka,其中消息值格式为 `Avro` ,设置 `value.converter` 以及 `value.converter.schema.registry.url` 参数:

...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...

有关详细信息,请参阅kafka connect datagen docs。
伟大的文章Kafka连接转换器和序列化。
pu82cl6c

pu82cl6c2#

主题需要avro类型的消息。 AvroProducerconfluent-kafka-python 关键在于:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema_str = """
{
   "namespace": "ksql",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "viewtime",
       "type" : "long"
     }, 
     {
       "name" : "userid",
       "type" : "string"
     }, 
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "ksql",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

avroProducer = AvroProducer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://schema_registry_host:port'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()

相关问题