大数据信号分析:存储和查询信号数据的更好方法

vddsk6oq  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(432)

我要用hadoop/spark做一些信号分析,我需要关于如何构建整个过程的帮助。
信号现在存储在一个数据库中,我们将用sqoop读取它,并在hdfs上的文件中进行转换,模式类似于:

<Measure ID> <Source ID> <Measure timestamp> <Signal values>

其中信号值只是由浮点逗号分隔的数字组成的字符串。

000123  S001  2015/04/22T10:00:00.000Z  0.0,1.0,200.0,30.0 ... 100.0
000124  S001  2015/04/22T10:05:23.245Z  0.0,4.0,250.0,35.0 ... 10.0
...
000126  S003  2015/04/22T16:00:00.034Z  0.0,0.0,200.0,00.0 ... 600.0

我们希望将交互式/批处理查询写入:
对信号值应用聚合函数

SELECT *
FROM SIGNALS
WHERE MAX(VALUES) > 1000.0

选择峰值超过1000.0的信号。
在聚合上应用聚合

SELECT SOURCEID, MAX(VALUES) 
FROM SIGNALS
GROUP BY SOURCEID
HAVING MAX(MAX(VALUES)) > 1500.0

选择至少有一个超过1500.0的信号源。
对示例应用用户定义函数

SELECT *
FROM SIGNALS
WHERE MAX(LOW_BAND_FILTER("5.0 KHz", VALUES)) > 100.0)

选择在5.0 khz下滤波后值至少超过100.0的信号。
我们需要一些帮助以便:
找到正确的文件格式将信号数据写入hdfs。我想用Parquet地板。你将如何组织数据?
了解正确的数据分析方法:最好创建不同的数据集(例如,使用spark处理数据并在hdfs上持久化结果),还是尝试在查询时从原始数据集执行所有操作?
hive是一个很好的工具,可以像我写的那样进行查询吗?我们运行的是cloudera enterprise hadoop,所以我们也可以使用impala。
如果我们从原始数据集生成不同的派生数据集,我们如何跟踪数据的沿袭,即知道数据是如何从原始版本生成的?
非常感谢你!

lb3vh1jj

lb3vh1jj1#

1) 柱状格式的parquet适合olap。实木复合地板的Spark支撑已经足够成熟,可以生产使用。我建议将表示信号值的字符串解析为以下数据结构(简化):

case class Data(id: Long, signals: Array[Double])
 val df = sqlContext.createDataFrame(Seq(Data(1L, Array(1.0, 1.0, 2.0)), Data(2L, Array(3.0, 5.0)), Data(2L, Array(1.5, 7.0, 8.0))))

保留double数组允许定义和使用如下自定义项:

def maxV(arr: mutable.WrappedArray[Double]) = arr.max
sqlContext.udf.register("maxVal", maxV _)
df.registerTempTable("table")

sqlContext.sql("select * from table where maxVal(signals) > 2.1").show()
+---+---------------+
| id|        signals|
+---+---------------+
|  2|     [3.0, 5.0]|
|  2|[1.5, 7.0, 8.0]|
+---+---------------+

sqlContext.sql("select id, max(maxVal(signals)) as maxSignal from table group by id having maxSignal > 1.5").show()
+---+---------+
| id|maxSignal|
+---+---------+
|  1|      2.0|
|  2|      8.0|
+---+---------+

或者,如果您需要某种类型安全性,请使用scala dsl:

import org.apache.spark.sql.functions._
val maxVal = udf(maxV _)
df.select("*").where(maxVal($"signals") > 2.1).show()
df.select($"id", maxVal($"signals") as "maxSignal").groupBy($"id").agg(max($"maxSignal")).where(max($"maxSignal") > 2.1).show()
+---+--------------+
| id|max(maxSignal)|
+---+--------------+
|  2|           8.0|
+---+--------------+

2) 这取决于:如果数据的大小允许以合理的延迟在查询时间内完成所有处理,那就去做吧。您可以从这种方法开始,然后为缓慢/流行的查询构建优化的结构
3) Hive是缓慢的,它过时的 Impala 和Sparksql。选择有时并不容易,我们使用经验法则:impala适用于没有连接的查询如果您的所有数据都存储在hdfs/hive中,spark具有更大的延迟,但连接是可靠的,它支持更多的数据源并具有丰富的非sql处理能力(如mllib和graphx)
4) 保持简单:存储原始数据(主数据集),消除重复并分区(我们使用基于时间的分区)。如果新数据到达分区,并且已经生成了下游数据集,请为此分区重新启动管道。
希望这有帮助

ewm0tg9j

ewm0tg9j2#

首先,我相信维塔利的方法在各个方面都非常好(我完全支持spark)
不过,我想提出另一种方法。原因是:
我们想做交互式查询(+我们有cdh)
数据已经结构化
需要的是“分析”而不是“处理”数据。如果(a)数据结构化,我们可以更快地形成sql查询,(b)我们不想每次运行查询时都编写一个程序,那么spark可能是一种过度杀伤力
以下是我想遵循的步骤:
使用sqoop摄取到hdfs:[可选]使用--as parquetfile
根据需要创建一个外部impala表或一个内部表。如果尚未将文件作为Parquet文件传输,则可以在此步骤中执行此操作。分区方式,最好是源id,因为我们的分组将发生在该列上。
所以,基本上,一旦我们传输了数据,我们所需要做的就是创建一个impala表,最好是Parquet格式的,并按要用于分组的列进行分区。请记住在加载后进行计算统计,以帮助impala更快地运行它。
移动数据:-如果我们需要从结果中生成feed,请创建一个单独的文件-如果另一个系统要更新现有数据,则在创建->加载表时将数据移动到其他位置-如果只是关于查询和分析以及获取报告(即,外部表就足够了),我们不需要不必要地移动数据—我们可以在相同的数据上创建一个外部配置单元表。如果我们需要运行长时间运行的批处理查询,我们可以使用hive。不过,这对交互式查询来说是个禁忌。如果我们从查询中创建任何派生表并希望通过impala使用,请记住在对配置单元生成的表运行impala查询之前先运行“invalidatemetadata”
血统-我还没有深入到它,这里有一个关于 Impala 血统使用cloudera导航器的链接

相关问题