用python维护kafka更新消息流中的表的最佳数据结构

ruarlubt  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(330)

假设我有一个固定维度(nxm)的表格数据集。我从kafka收到一个更新流,更新这个表中的条目。最后,我希望有一个带有最新版本表的pandasDataframe,我正在考虑一些选项:
将其作为表/Dataframe保存在内存中。我在这里担心的是,我不知道是否可以避免多线程,因为一个进程将永远处于接收消息的for循环中。
在一个外部结构中维护它,并有一个独立于它的独立进程。外部数据存储的选择:a)sqlite-可能有并发问题,对任意行的更新可能有点混乱。b) redis-易于维护,但很难一次查询/读取整个表(这是我通常访问数据的方式)。
我是一个Kafka初学者,所以这里的任何建议将不胜感激。你将如何处理这个问题?谢谢!
edit:我想我也可以把它保存在内存中,然后把整个东西推到sqlite上?

v1l68za4

v1l68za41#

我最初的方法是问:我能不能先创建一个“足够好”的解决方案,然后在需要时再优化它?
除非您需要担心非常敏感的信息(如医疗保健或财务数据),或者数据肯定会迅速扩展,否则我建议您先尝试一个简单的解决方案,然后再看看是否遇到任何问题。你不可以!
最终,我可能会从sqlite解决方案开始,因为它的设置相对简单,而且非常适合用例(即“事务性”情况)。
我会考虑以下几点:

单一流程的优点/缺点

除非您的数据是高速/大容量的,否则您建议在同一进程中使用和处理数据可能是好的。本地处理数据要比通过网络接收数据快得多(假设您的kafka提要不在本地计算机上),因此从kafka接收数据可能是瓶颈。
但是,让一个python进程无限期地运行可能代价高昂,而且您需要确保将数据存储到一个文件或数据库中,以便在进程关闭时防止数据丢失。

关系数据库(例如sqlite)

使用像sqlite这样的关系数据库可能是最好的选择,这同样取决于您接收数据的速度。但是关系数据库一直用于事务目的(事实上,这是它们的主要目的之一),这意味着写入的量和速度都很高,因此将数据持久保存在sqlite中并在其中进行更新肯定是有意义的。如果有意义的话,您可以考虑将数据拆分为单独的表(例如第三范式),或者如果更合适的话,您可以将所有数据保存在一个表中。

在内存中维护表

您还可以像您建议的那样,将表保存在内存中,只要您在更新后以某种方式(csv、sqlite等)将其持久化到磁盘。例如,您可以:
把你的副本存起来。
当您得到一个更新时,对内存表进行更新。
将表写入磁盘。
如果进程停止或重新启动,请从内存中读取表以开始。
然而,pandas在访问和更新行中的单个值时可能会比较慢,因此实际上,将表作为字典之类的东西保存在内存中,并在不使用pandas的情况下将其写入磁盘可能更有意义。但是,如果你能在Pandas身上做到这一切(关于速度和体积),那也是一个很好的开始。

相关问题