我有两个数据集[ReconEntity],其中ReconEntity是:
case class ReconEntity(rowId: String,
groupId: String,
amounts: List[Amount],
processingDate: Long,
attributes: Map[String, String],
entityType: String,
isDuplicate: String)
字符串
第一个数据集看起来像:
+-----+-------+------------------+--------------+----------+-----------+
|rowId|groupId| amount|processingDate|attributes|isDuplicate|
+-----+-------+------------------+--------------+----------+-----------+
| C1| G1|USD,10.00000000...| 1551021334| rowId,C1| false|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
| C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
| C3| G2|USD,6.000000000...| 1551011017| rowId,C3| true|
+-----+-------+------------------+--------------+----------+-----------+
型
第二个数据集看起来像:
+-----+-------+------------------+--------------+----------+-----------+
|rowId|groupId| amount|processingDate|attributes|isDuplicate|
+-----+-------+------------------+--------------+----------+-----------+
| C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
| C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
+-----+-------+------------------+--------------+----------+-----------+
型
我希望结果看起来像这样:
+-----+-------+------------------+--------------+----------+-----------+
|rowId|groupId| amount|processingDate|attributes|isDuplicate|
+-----+-------+------------------+--------------+----------+-----------+
| C1| G1|USD,10.00000000...| 1551021334| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
| C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
| C3| G2|USD,6.000000000...| 1551011017| rowId,C3| true|
+-----+-------+------------------+--------------+----------+-----------+
型
我使用左连接连接两个数据集,如果第二个数据集中不存在rowId,我将isDuplicate标志的值标记为true,否则将结果数据集中的原始值标记为true。逻辑是:
inputEntries.as("inputDataset").join(otherEntries.as("otherDataset"),
col("inputDataset.rowId") === col("otherDataset.rowId"), "left")
.select(
col("inputDataset.rowId"),
col("inputDataset.groupId"),
col("inputDataset.amounts"),
col("inputDataset.processingDate"),
col("inputDataset.attributes"),
col("inputDataset.entityType"),
when(
col("otherDataset.rowId").isNull, TRUE
).otherwise(col("inputDataset.isDuplicate")).as(IS_DUPLICATE)
).as[ReconEntity]
型
这里的joinKey是rowId。这个逻辑在本地工作得很好,但是当我试图运行spark作业时,结果并不像预期的那样。我不太熟悉连接,想知道我的逻辑是否正确。2个数据集的左连接的输出是什么。
1条答案
按热度按时间wwtsj6pe1#
对于给定的要求,左联接将是正确的选择。
你能分享一下你在spark-job输出中看到的差异吗?当前解决方案的唯一问题是当你在第二个数据集中有重复时(其他情况下)。在这种情况下,第一个数据集中的每一行都会与重复的行匹配,从而创建多个条目。
由于我们只需要检查
rowId
列,因此可以使用deduplicate
操作来仅保留第二个数据集中的唯一行,以避免连接后的重复。下面是一个工作代码,包括重复数据消除和重复数据消除前后的连接输出示例。
字符串