我正在尝试将json文件加载到kafka,并希望使用 file-pulse connector
.
我的json文件如下:
{
"field1": "string1",
"field2": "string2",
"field3": 9.8,
"lastField": "2020-09-03T18:00:00"
}
问题连接器属性文件的读取器类应该是什么?我已经在下面了
# File types
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.json$
# task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.WHATSHOULDitBE
我试图从文件pulse conenctor doc中找到答案,或者检查kafka connect filepulse连接器的源代码,但是失败了。
1条答案
按热度按时间k3bvogb11#
使用connect filepulse读取json文件有两个选项:
io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader
:如果文件每行包含一个json对象。io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader
:如果文件包含单个json对象或数组。然后,您必须配置内置
JSONFilter
实际解析字节[]或字符串行。配置示例:
下面是一篇博客文章,展示了如何处理json文件:将数据流到kafkas01-e03中-加载json文件