使用filepulse-explodefilter从xml输入kakfa主题中的数据

hsvhsicv  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(278)

我正在创建一个kakfa主题,它来自一个xml,并以avro格式写入该主题。我用文件脉冲来做这个,在文档中我看到了explodefilter。我试图根据文档进行配置,但它不起作用。connect docker控制台出现以下错误:

io.streamthoughts.kafka.connect.filepulse.data.DataException: leitura is not a valid field name
 at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.lookupField(TypedStruct.java:464)
 at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.get(TypedStruct.java:226)
 at io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter.apply(ExplodeFilter.java:66)
 at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:51)
 at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:159)
 at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:131)
 at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:99)
 at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:169)
 at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:131)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

遵循docker compose的yaml:

connect-file-pulse:
 image: streamthoughts/kafka-connect-file-pulse:latest
 container_name: connect
 depends_on:
 - cp-broker
 - cp-schema-registry
 ports:
 - "8083:8083"
 - "8001:8000"
 environment:
 CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
 CONNECT_REST_ADVERTISED_HOST_NAME: connect
 CONNECT_REST_PORT: 8083
 CONNECT_GROUP_ID: compose-connect-group
 CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
 CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
 CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
 CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
 CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
 CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
 CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
 CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

遵循输入xml:

<?xml version="1.0" encoding="UTF-8"?>
 <playlists>
 <pai>test</pai>
 <leitura>
 <title>test</title>
 <artist>test</artist>
 <album>test</album>
 <duration>test</duration>
 </leitura>
 <leitura>
 <title>test2</title>
 <artist>test2</artist>
 <album>test2</album>
 <duration>test2</duration>
 </leitura>
 </playlists>

跟随连接器:

{
 "config":
 {
 "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
 "filters":"Explode",
 "filters.Explode.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter",
 "filters.Explode.source":"leitura",
 "force.array.on.fields": "leitura",
 "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
 "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
 "file.filter.regex.pattern":".*\\.xml$",
 "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
 "offset.strategy":"name",
 "topic":"LeituraRaw",
 "internal.kafka.reporter.id": "xml-config-start",
 "internal.kafka.reporter.bootstrap.servers": "broker:29092",
 "internal.kafka.reporter.topic":"LeituraRaw",
 "tasks.max": 1
 },
 "name": "xml-config"
 }
sxpgvts3

sxpgvts31#

错误是由于 ExplodeFilter 不支持点符号来选择字段。现在,这个问题已经在connectfilepulse1.5.2版中修复
https://github.com/streamthoughts/kafka-connect-file-pulse/issues/69

相关问题