将上传文件的二进制数据发送到kafka,然后通过一些连接到kafka主题的服务分发处理上传,这是一种好方法吗?我看到了一些优势:过滤上传数据复制品有些服务可以处理上传,而不仅仅是一个你觉得怎么样?
euoag5mw1#
将上传文件的二进制数据发送到kafka,然后通过一些连接到kafka主题的服务分发处理上传,这是一种好方法吗?通常,文件被上传到文件系统,它们的uri存储在kafka消息中。这是为了确保kafka消息的大小相对较小,从而提高其客户端的吞吐量。在这种情况下,如果我们在kafka消息中放入大型对象,消费者将不得不读取整个文件。所以你的 poll() 会比平常花更长的时间。另一方面,如果我们只是放置文件的uri而不是文件本身,那么消息消耗将相对更快,您可以通过增加应用程序吞吐量,将文件处理委托给另一个线程(可能来自线程池)。
poll()
正如kafka中有副本一样,文件系统也可以有副本。甚至kafka也在文件系统中存储消息(作为段文件)。因此,复制也可以用文件系统本身来完成。最好的方法是在kafka消息中放置一个指向该文件的uri,然后为该uri放置一个处理程序,该处理程序将负责向您提供该文件,并可能负责在删除原始文件时向您提供一个副本。处理程序可能与系统的其余部分松散耦合,专门为管理文件、维护副本等而构建。
只有在实际读取文件内容时,才能对上载的数据进行过滤。您甚至可以将文件的uri放在消息中并从中读取。例如,如果您使用的是kafka流,那么可以将过滤逻辑放入 transform() 或者 mapValues() 等。
transform()
mapValues()
stream.from(topic) .mapValues(v -> v.getFileURI()) .filter((k,fileURI) -> validate(read(fileURI))) .to(..)
在邮件中存储文件的另一个缺点是,您可能会 segment.bytes 如果文件较大,则限制。你得不断地换衣服 segment.bytes 每次都要满足新文件的大小要求。另一点是,如果 segment.bytes 设置为1gb,第一条消息(文件)大小为 750MB ,您的下一条消息是 251 MB ,的 251MB 邮件无法放入第一段,因此您的第一段将只有一条邮件,尽管它尚未达到限制。这意味着每个段将存储相对较少的消息。
segment.bytes
750MB
251 MB
251MB
1条答案
按热度按时间euoag5mw1#
将上传文件的二进制数据发送到kafka,然后通过一些连接到kafka主题的服务分发处理上传,这是一种好方法吗?
通常,文件被上传到文件系统,它们的uri存储在kafka消息中。这是为了确保kafka消息的大小相对较小,从而提高其客户端的吞吐量。
在这种情况下,如果我们在kafka消息中放入大型对象,消费者将不得不读取整个文件。所以你的
poll()
会比平常花更长的时间。另一方面,如果我们只是放置文件的uri而不是文件本身,那么消息消耗将相对更快,您可以通过增加应用程序吞吐量,将文件处理委托给另一个线程(可能来自线程池)。
复制品
正如kafka中有副本一样,文件系统也可以有副本。甚至kafka也在文件系统中存储消息(作为段文件)。因此,复制也可以用文件系统本身来完成。
最好的方法是在kafka消息中放置一个指向该文件的uri,然后为该uri放置一个处理程序,该处理程序将负责向您提供该文件,并可能负责在删除原始文件时向您提供一个副本。
处理程序可能与系统的其余部分松散耦合,专门为管理文件、维护副本等而构建。
过滤上传数据
只有在实际读取文件内容时,才能对上载的数据进行过滤。您甚至可以将文件的uri放在消息中并从中读取。例如,如果您使用的是kafka流,那么可以将过滤逻辑放入
transform()
或者mapValues()
等。命中段.bytes
在邮件中存储文件的另一个缺点是,您可能会
segment.bytes
如果文件较大,则限制。你得不断地换衣服segment.bytes
每次都要满足新文件的大小要求。另一点是,如果
segment.bytes
设置为1gb,第一条消息(文件)大小为750MB
,您的下一条消息是251 MB
,的251MB
邮件无法放入第一段,因此您的第一段将只有一条邮件,尽管它尚未达到限制。这意味着每个段将存储相对较少的消息。