kafka文件流连接和流api

xghobddn  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(407)

在使用文件流连接器时,我在文件中有超过一千万条记录(它不是一个文件,它的分区是按帐户划分的)。我必须将这些文件加载到主题中并更新我的流。已经走过了独立的溪流,我有以下问题,需要帮助才能实现。
查看数据集,我有两个帐户#,每个帐户有5行,我需要将它们分为两行并键入acctnbr。
如何编写源连接器来读取文件并获取分组逻辑?
我的代理在linux机器x,y,z上运行。。在开发源连接器之后,我的jar文件是否应该部署在每个代理中(如果我开始在分布式代理中运行)?
我只有30分钟的窗口提取文件下降到主题?有哪些参数可以用来调整逻辑以降低我的工作窗口?仅供参考,这个主题将有超过50个分区和3个代理设置。
数据集:

{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}
xdyibdwo

xdyibdwo1#

如何编写源连接器来读取文件并获取分组逻辑
filestream连接器不能做到这一点,除了编写您自己的连接器的示例之外,它不打算用于这样的目的。换句话说,不要在生产中使用。
也就是说,您可以使用诸如flume、filebeat、fluentd、nifi、streamset等替代解决方案来全局化您的文件路径,然后将所有记录逐行发送到kafka主题中。
在开发源连接器之后,我的jar文件应该部署在每个代理中
您不应该在任何代理上运行connect。连接服务器称为worker。
只有30分钟的时间提取文件到主题吗?
不清楚这个数字是从哪里来的。上面列出的任何方法都会监视所有新文件,而不必定义任何窗口。

相关问题