kafka connect filestreamsourceconnector不生成主题

piztneat  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(408)

我正在尝试创建一个kafka connect连接器,以便从avro主题接收到一个文件。
然后使用kafka connect将此文件还原到另一个主题。
接收器工作正常,我可以看到接收器文件增长并读取数据。但是当我尝试还原到一个新主题时,新主题没有数据。。
我没有得到任何错误,我已经重置了偏移量,我创建了一个新的kafka连接并尝试还原,我创建了一个完整的新kafka集群,并且始终是相同的,源连接器上没有错误,但是主题是空的。
这里是源连接器配置的输出:

{
  "name": "restored-exchange-rate-log",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "value.converter.schemas.enable": "true",
    "name": "restored-exchange-rate-log",
    "topic": "restored-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  },
  "tasks": [
    {
      "connector": "restored-exchange-rate-log",
      "task": 0
    }
  ],
  "type": "source"
}

这里是源connector状态的输出:

{
  "name": "restored-exchange-rate-log",
  "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "kafka-connect:8883"
    }
  ],
  "type": "source"
}

这里是接收器连接器配置的输出:

{
    "name": "bkp-exchange-rate-log",
    "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "topics": "exchange-rate-log",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "value.converter.schemas.enable": "true",
    "name": "bkp-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    },
    "tasks": [
    {
        "connector": "bkp-exchange-rate-log",
        "task": 0
    }
    ],
    "type": "sink"
}

这里是接收器连接器状态的输出:

{
    "name": "bkp-exchange-rate-log",
    "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
    },
    "tasks": [
    {
        "state": "RUNNING",
        "id": 0,
        "worker_id": "kafka-connect:8883"
    }
    ],
    "type": "sink"
}

接收器文件正在工作,始终在增长,但主题还原的汇率日志完全为空。
添加更多详细信息。
我现在试着用“zalando”的方式,但是我们不使用s3,而是使用filestream连接器。
这里是Flume:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "topics": "exchange-rate-log",
  "format": "binary",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "bkp-exchange-rate-log"
}

来源如下:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "format": "binary",
  "topic": "bin-test-exchange-rate-log",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "restore-exchange-rate-log"
}

接收器连接器看起来正常,接收器生成了此文件/tmp/exchange-rate-log.bin,并且正在增加,但源(还原)出现错误:

Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
    at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more
ss2ws0br

ss2ws0br1#

我不完全确定connect文件连接器是一个很好的用例。
另外,avro转换器不会以可复制的格式转储文件。看起来像 Struct{field=value} 如果你真的想转储到一个文件,就这么做吧 kafka-avro-console-consumer ,包含键,传递 --key-deserializer 作为字符串,然后用 > file.txt 为了恢复,您可以尝试使用avro控制台生产者,但是没有字符串序列化程序属性,因此需要引用键,我相信是传递给json解析器的
你可以这样测试

echo '"1"|{"data":value"}'  > kafka-avro-console-producer...

(需要设置 key.separator 财产,以及)
做一个文件

kafka-avro-console-producer...  < file.txt

如果整个kafka集群都消失了,只剩下这个文件,那么您还需要备份avro模式(因为注册表 _schemas 主题消失了)

qnzebej0

qnzebej02#

我能够使用kafkaavro控制台生成一个主题的“dump”。我们正在使用ssl+schema注册表。
以下是能够生成主题转储的命令行:

tpc=exchange-rate-log
SCHEMA_REGISTRY_OPTS="-Djavax.net.ssl.keyStore=. -Djavax.net.ssl.trustStore=. -Djavax.net.ssl.keyStorePassword=. -Djavax.net.ssl.trustStorePassword=." \
kafka-avro-console-consumer \
  --from-beginning --bootstrap-server $CONNECT_BOOTSTRAP_SERVERS \
  --property schema.registry.url=$CONNECT_SCHEMA_REGISTRY_URL \
  --topic $tpc --consumer-property security.protocol=SSL \
  --consumer-property ssl.truststore.location=/etc/ssl/kafkaproducer.truststore.jks \
  --consumer-property ssl.truststore.password=$MYPASS \
  --consumer-property ssl.keystore.location=/etc/ssl/kafkaproducer.keystore.jks \
  --consumer-property ssl.keystore.password=$MYPASS \
  --consumer-property ssl.key.password=$MYPASS \
  --property "key.separator=::-::" \
  --property "schema.id.separator=::_::" \
  --property print.schema.ids=true \
  --timeout-ms 15000 \
  --property "print.key=true" \
  --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" > $tpc.dump

但是我没有找到一种方法来导入它回来使用Kafkaavro控制台生产者,因为它不与非avro键工作。通过这个转储文件,我可以编写一个python生产者来读取该文件并还原主题。

相关问题