kafka connect file pulse将json文件读入kafka

6g8kf2rb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(417)

我正在尝试将json文件加载到kafka,并希望使用 file-pulse connector .
我的json文件如下:

{
    "field1": "string1",
    "field2": "string2",
    "field3": 9.8,
    "lastField": "2020-09-03T18:00:00"
}

问题连接器属性文件的读取器类应该是什么?我已经在下面了


# File types

fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.json$

# task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.WHATSHOULDitBE

我试图从文件pulse conenctor doc中找到答案,或者检查kafka connect filepulse连接器的源代码,但是失败了。

k3bvogb1

k3bvogb11#

使用connect filepulse读取json文件有两个选项: io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader :如果文件每行包含一个json对象。 io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader :如果文件包含单个json对象或数组。
然后,您必须配置内置 JSONFilter 实际解析字节[]或字符串行。
配置示例:

"config" : {
    [...]
    "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
    "file.filter.regex.pattern":".*\\.json$",
    "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
    "filters": "ParseJSON",
    "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
    "filters.ParseJSON.source":"message",
    "filters.ParseJSON.merge":"true",
}

下面是一篇博客文章,展示了如何处理json文件:将数据流到kafkas01-e03中-加载json文件

相关问题