将数据从kafka导入elasticsearch时,如何获取导入进度和错误日志?

k10s72fa  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(254)

背景:
我正在尝试将数据从kafka导入elasticsearch,有两种客户端。一个是web客户端,另一个是代理客户端。
当用户上传csv文件时,web客户端将处理csv文件,web客户端每10000行读取一次csv文件,并将csv总行数的数据消息发送给producer。生产者将消息发送给kafka,然后消费者提取消息,并将数据导入elasticsearch。同时,使用者使用数据消息长度和csv总计数来更新任务进度,如果有错误,还更新错误日志。最后我们的web客户端将知道错误和导入进度。
代理客户端监视日志文件的变化,一旦有新的日志出现,它会向producer发送消息,与web客户端相同,但它不关心进程。因为日志总是像nginx日志一样增长。
框架:
以下是我使用的框架:

生产者和消费者是我们使用kafkapython的python程序。
问题:
有时消费者崩溃了,它会自动重启并再次导入相同的数据。
有时客户端发送太多的消息,生产者可能会错过一些,因为http请求有限制,我猜。
问题:
有没有更好的框架来做这些事情?比如使用kafka connect elasticsearch,spark streaming?

huus2vyu

huus2vyu1#

是-使用kafka connect elasticsearch连接器。这会让你的生活轻松很多。kafkaconnectapi是专门为您设计的,它可以为您完成所有这些困难的工作(重启、偏移管理等)。作为最终用户,您只需要设置一个配置文件。你可以在这里阅读一个使用Kafka连接的例子。
Kafka连接是ApacheKafka的一部分。elasticsearch连接器是开源的,可以在github上单独使用。或者,只需下载confluent platform,它将kafka的最新版本与连接器(包括elasticsearch、hdfs等)和其他一些有用的工具捆绑在一起。

相关问题