我正在尝试创建一个kafka connect连接器,以便从avro主题接收到一个文件。
然后使用kafka connect将此文件还原到另一个主题。
接收器工作正常,我可以看到接收器文件增长并读取数据。但是当我尝试还原到一个新主题时,新主题没有数据。。
我没有得到任何错误,我已经重置了偏移量,我创建了一个新的kafka连接并尝试还原,我创建了一个完整的新kafka集群,并且始终是相同的,源连接器上没有错误,但是主题是空的。
这里是源连接器配置的输出:
{
"name": "restored-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"value.converter.schemas.enable": "true",
"name": "restored-exchange-rate-log",
"topic": "restored-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "restored-exchange-rate-log",
"task": 0
}
],
"type": "source"
}
这里是源connector状态的输出:
{
"name": "restored-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "source"
}
这里是接收器连接器配置的输出:
{
"name": "bkp-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"topics": "exchange-rate-log",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"value.converter.schemas.enable": "true",
"name": "bkp-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "bkp-exchange-rate-log",
"task": 0
}
],
"type": "sink"
}
这里是接收器连接器状态的输出:
{
"name": "bkp-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "sink"
}
接收器文件正在工作,始终在增长,但主题还原的汇率日志完全为空。
添加更多详细信息。
我现在试着用“zalando”的方式,但是我们不使用s3,而是使用filestream连接器。
这里是Flume:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"topics": "exchange-rate-log",
"format": "binary",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "bkp-exchange-rate-log"
}
来源如下:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"format": "binary",
"topic": "bin-test-exchange-rate-log",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "restore-exchange-rate-log"
}
接收器连接器看起来正常,接收器生成了此文件/tmp/exchange-rate-log.bin,并且正在增加,但源(还原)出现错误:
Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
2条答案
按热度按时间ss2ws0br1#
我不完全确定connect文件连接器是一个很好的用例。
另外,avro转换器不会以可复制的格式转储文件。看起来像
Struct{field=value}
如果你真的想转储到一个文件,就这么做吧kafka-avro-console-consumer
,包含键,传递--key-deserializer
作为字符串,然后用> file.txt
为了恢复,您可以尝试使用avro控制台生产者,但是没有字符串序列化程序属性,因此需要引用键,我相信是传递给json解析器的你可以这样测试
(需要设置
key.separator
财产,以及)做一个文件
如果整个kafka集群都消失了,只剩下这个文件,那么您还需要备份avro模式(因为注册表
_schemas
主题消失了)qnzebej02#
我能够使用kafkaavro控制台生成一个主题的“dump”。我们正在使用ssl+schema注册表。
以下是能够生成主题转储的命令行:
但是我没有找到一种方法来导入它回来使用Kafkaavro控制台生产者,因为它不与非avro键工作。通过这个转储文件,我可以编写一个python生产者来读取该文件并还原主题。