我正在创建一个kakfa主题,它来自一个xml,并以avro格式写入该主题。我用文件脉冲来做这个,在文档中我看到了explodefilter。我试图根据文档进行配置,但它不起作用。connect docker控制台出现以下错误:
io.streamthoughts.kafka.connect.filepulse.data.DataException: leitura is not a valid field name
at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.lookupField(TypedStruct.java:464)
at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.get(TypedStruct.java:226)
at io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter.apply(ExplodeFilter.java:66)
at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:51)
at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:159)
at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:131)
at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:99)
at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:169)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:131)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
遵循docker compose的yaml:
connect-file-pulse:
image: streamthoughts/kafka-connect-file-pulse:latest
container_name: connect
depends_on:
- cp-broker
- cp-schema-registry
ports:
- "8083:8083"
- "8001:8000"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
遵循输入xml:
<?xml version="1.0" encoding="UTF-8"?>
<playlists>
<pai>test</pai>
<leitura>
<title>test</title>
<artist>test</artist>
<album>test</album>
<duration>test</duration>
</leitura>
<leitura>
<title>test2</title>
<artist>test2</artist>
<album>test2</album>
<duration>test2</duration>
</leitura>
</playlists>
跟随连接器:
{
"config":
{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"filters":"Explode",
"filters.Explode.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter",
"filters.Explode.source":"leitura",
"force.array.on.fields": "leitura",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"offset.strategy":"name",
"topic":"LeituraRaw",
"internal.kafka.reporter.id": "xml-config-start",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"LeituraRaw",
"tasks.max": 1
},
"name": "xml-config"
}
1条答案
按热度按时间sxpgvts31#
错误是由于
ExplodeFilter
不支持点符号来选择字段。现在,这个问题已经在connectfilepulse1.5.2版中修复https://github.com/streamthoughts/kafka-connect-file-pulse/issues/69