上传文件到Kafka和进一步处理?

lqfhib0f  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(483)

将上传文件的二进制数据发送到kafka,然后通过一些连接到kafka主题的服务分发处理上传,这是一种好方法吗?
我看到了一些优势:
过滤上传数据
复制品
有些服务可以处理上传,而不仅仅是一个
你觉得怎么样?

euoag5mw

euoag5mw1#

将上传文件的二进制数据发送到kafka,然后通过一些连接到kafka主题的服务分发处理上传,这是一种好方法吗?
通常,文件被上传到文件系统,它们的uri存储在kafka消息中。这是为了确保kafka消息的大小相对较小,从而提高其客户端的吞吐量。
在这种情况下,如果我们在kafka消息中放入大型对象,消费者将不得不读取整个文件。所以你的 poll() 会比平常花更长的时间。
另一方面,如果我们只是放置文件的uri而不是文件本身,那么消息消耗将相对更快,您可以通过增加应用程序吞吐量,将文件处理委托给另一个线程(可能来自线程池)。

复制品

正如kafka中有副本一样,文件系统也可以有副本。甚至kafka也在文件系统中存储消息(作为段文件)。因此,复制也可以用文件系统本身来完成。
最好的方法是在kafka消息中放置一个指向该文件的uri,然后为该uri放置一个处理程序,该处理程序将负责向您提供该文件,并可能负责在删除原始文件时向您提供一个副本。
处理程序可能与系统的其余部分松散耦合,专门为管理文件、维护副本等而构建。

过滤上传数据

只有在实际读取文件内容时,才能对上载的数据进行过滤。您甚至可以将文件的uri放在消息中并从中读取。例如,如果您使用的是kafka流,那么可以将过滤逻辑放入 transform() 或者 mapValues() 等。

stream.from(topic)
.mapValues(v -> v.getFileURI())
.filter((k,fileURI) -> validate(read(fileURI)))
.to(..)

命中段.bytes

在邮件中存储文件的另一个缺点是,您可能会 segment.bytes 如果文件较大,则限制。你得不断地换衣服 segment.bytes 每次都要满足新文件的大小要求。
另一点是,如果 segment.bytes 设置为1gb,第一条消息(文件)大小为 750MB ,您的下一条消息是 251 MB ,的 251MB 邮件无法放入第一段,因此您的第一段将只有一条邮件,尽管它尚未达到限制。这意味着每个段将存储相对较少的消息。

相关问题