我从这里部署了Kafka。我还添加了 docker-compose.yml
postgres容器如下:
postgres:
image: postgres
hostname: kafka-postgres
container_name: kafka-postgres
depends_on:
- ksql-server
- broker
- schema-registry
- connect
ports:
- 5432:5432
创建了主题页面视图。
此外,我用设置创建了datagenconnector并运行了它。
{
"name": "datagen-pageviews",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"max.interval": "100",
"iterations": "999999999",
"quickstart": "pageviews"
}
据我所知,连接器为主题定义了一个模式:
{
"type": "record",
"name": "pageviews",
"namespace": "ksql",
"fields": [
{
"name": "viewtime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "pageid",
"type": "string"
}
],
"connect.name": "ksql.pageviews"
}
我的下一步是创建jdbcsinkconnector,它将数据从kafka主题传输到postgres表。成功了。连接器的设置:
{
"name": "from-kafka-to-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"pageviews"
],
"connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"auto.create": "true",
"auto.evolve": "true"
}
然后我试着自己给这个主题发信息。但失败,出现错误:
[2020-02-01 21:16:11750]任务to-pg-0中遇到错误。正在使用类“io.confluent.connect.avro.avroconverter”执行阶段“value\u converter”,其中消耗的记录为{topic='pageviews',partition=0,offset=23834,timestamp=1580591160374,timestamptype=createtime}(org.apache.kafka.connect.runtime.errors.logreporter)org.apache.kafka.connect.errors.dataexception:未能将主题页面视图的数据反序列化到avro:at io.confluent.connect.avro.avroconverter.toconnectdata(avroconverter)。java:110)位于org.apache.kafka.connect.runtime.workersinktask.lambda$convertandtransformrecord$1(workersinktask)。java:487) 位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandretry(retrywithtoleranceoperator)。java:128)位于org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execandhandleerror(retrywithtoleranceoperator)。java:162)在org.apache.kafka.connect.runtime.errors.retrywithtoleranceoperator.execute(retrywithtoleranceoperator)。java:104)位于org.apache.kafka.connect.runtime.workersinktask.convertandtransformrecord(workersinktask)。java:487)位于org.apache.kafka.connect.runtime.workersinktask.convertmessages(workersinktask)。java:464)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:320)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:224)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:192)在org.apache.kafka.connect.runtime.workertask.dorun(workertask。java:177)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:227)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)原因:org.apache.kafka.common.errors.serializationexception:反序列化id-1的avro消息时出错原因:org.apache.kafka.common.errors.serializationexception:未知的魔法字节!
所以send方法很重要。我就是这样做的(python,合流kafka python):
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
'viewtime': 123,
'userid': 'user_1',
'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()
也许我应该提供一个带有消息的模式(avroproducer)?
2条答案
按热度按时间htzpubme1#
出现问题的原因是您试图使用avro转换器从非avro主题读取数据。
有两种可能的解决方案:
1切换kafka connect的接收器连接器以使用正确的转换器
例如,如果您正在将来自kafka主题的json数据消费到kafka连接接收器中:
...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...
pu82cl6c2#
主题需要avro类型的消息。
AvroProducer
从confluent-kafka-python
关键在于: