我目前正在研究使用spark流来接收类似日志文件的条目,并出于统计原因对它们进行一些计算。
hdfs上有一些数据集,现在可以从hbase和hive访问,需要这些数据集来查找某些数据并进行转换,例如ip与机器名和机器所有者之间的Map。
spark应用程序预计将在我们的集群上日复一日地运行数周而不重新启动。但是,这些参考表每隔几个小时更新一次。
如果所使用的数据稍微旧一点是可以的,但是数据只有两周是不可以的。因此,我想知道如何在Map和reduce阶段中查找转换和丰富的数据。我有几个主意。
广播变量可以读入数据集并有效地传递数据。但是,一旦设置了broadcast变量,就不能对其进行更改,并且在driver类中再次获取数据、取消持久化并广播新的数据将不起作用,因为worker的指针都指向旧的数据集。我不知道有没有办法解决这个问题。
可以进行hbase get()查询。如果根据查找键将数据定向到缩减器,则每个缩减器可以保存整个数据集子集的缓存,并且可以保存自己的本地缓存。hbase在获取单个记录时应具有最小的延迟。
还有别的吗?
1条答案
按热度按时间mqkwyuun1#
你有两个选择。
首先是使用
foreachRDD
在你的数据流上转换。foreachRDD
在驱动端执行,这意味着您可以在那里创建任何新的rdd。您可以存储时间计数器并每隔10-15分钟从hdfs重新读取文件第二种方法是读取数据库中的一些文件
transform
在数据流上进行转换并将其结果保存在内存中。使用这种方法,必须由每个执行器读取整个查找表,这是不高效的我建议你用第一种方法。更准确地说,您可以将上次更新数据时的标志存储在某个位置,并将其存储在spark应用程序中。在每次迭代中,检查该标志的值(例如,存储在hbase或zookeeper中),并将其与本地存储的标志进行比较—如果不同,则重新读取查找表;如果不不同,则使用旧的查找表执行操作