合并两个包含数据随时间变化的表(类似于方波卷积)

m3eecexj  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(178)

我有一个现有的表,其中包含了不同时间点的不同主题的测量。它实际上驻留在solr集合中(每一行都是一个文档),并被加载到spark中。
SIDTM21A4B27A31X34Y37Z型
为了澄清,第一行表示id为2的受试者,在时间1测量为a。在第二行,在时间4再次测量受试者#2,该时间结果为b。此表仅保存特定受试者id的“m”变化,因此,如果我在t=8时测量受试者#2并找到值a,则不会保存它(表示最后一次测量仍然有效)。
然后,我从另一个来源获得各种新的测量值,这是第二个表:
sidtm42c23b28a32k39b型
现在,我需要将这些新信息合并到现有(旧)表中,但并非所有度量都能提供新的/有意义的信息。只有在以下情况下,新信息才有用:
这是我以前从未测量过的。例如,在第一行中,对subject id 4进行了测量,我之前没有任何数据。
它给出了一个更早的变化检测,例如第2行说,受试者#2在t=3时有值b,这比我之前的测量更早(t=4)。我正在尽可能早地发现变化。
相比之下,第3行没有给我任何新的信息,因为我已经知道主题2在t=7时是a(我的现有信息更早)
第4行也很有用,它让我知道主题3在t=9时从z变成了b。
所以当这两个表合并时,结果应该是这样的:
SIDTM21A3B27A31x32K34Y37Z39B42C型
第一个问题是,如何在sql/dataframeapi中做到这一点?我想它可以通过一些窗口函数魔术与连接,但不能把我的头围绕它,因为连接是非常有条件的。我找到了这个解决方案,虽然我不完全确定它是否正确,或者可能有更好的解决方案:

union two tables into a table named "unified"

select sid, t, m from 
        (
            select 
                (lag(`sid`) over (order by `sid`) != `sid`) as sidChanged, 
                (lag(`m`) over (order by `m`) != `m`) as mChanged, 
                `sid`, `t`, `m` from unified
        )
    where sid is null or sidChanged or mChanged
    order by `sid` asc, `t` asc

第二个也是更难的问题:第一个表实际上是巨大的,作为一个集合驻留在solr中,第二个表更小,有大约100万行。如何确定哪些数据部分可能会受到新度量的影响,以便从solr加载该部分数据,因为如果不这样做,则需要查询solr以获取每个可能的更改?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题