使用pyspark的结构化流处理,我尝试将两个数据流合并成一个具有左外连接的数据流,以检索两个数据流中的所有数据。
例如,我有一个数据模型:
# Schemas
test_df1_schema = StructType([
StructField("item1", StringType(), True),
StructField("item2", IntegerType(), True),
StructField("target_id", LongType(), True),
StructField("df1_timestamp", TimestampType(), True),
])
test_df2_schema = StructType([
StructField("item1", StringType(), True),
StructField("item2", IntegerType(), True),
StructField("target_id", LongType(), True),
StructField("df2_timestamp", TimestampType(), True),
])
# Initialize tables
test_df1 = spark.createDataFrame([
("BlaBla1",126,111111,datetime.now()),
], test_df1_schema) \
.write \
.format("delta") \
.mode('overwrite') \
.save("/data/tables/test_df1")
test_df2 = spark.createDataFrame([
("BlaBla1",126,999999,datetime.now()),
], test_df2_schema) \
.write \
.format("delta") \
.mode('overwrite') \
.save("/data/tables/test_df2")
这两个表如下所示:
+-------+------+---------+--------------------------+
|item1 |item2 |target_id|df1_timestamp |
+-------+----------------+--------------------------+
|BlaBla1|126 |111111 |2020-09-03 05:54:55.103165|
+-------+------+---------+--------------------------+
+-------+------+---------+--------------------------+
|item1 |item2 |target_id|df2_timestamp |
+-------+----------------+--------------------------+
|BlaBla1|126 |999999 |2020-09-03 05:55:02.848808|
+-------+------+---------+--------------------------+
在这里,我创建了两个流,我想使用left outer join合并它们,以从两边获取数据:
# Read and Join tables
test_df1_stream = spark.readStream.format('delta').load('/data/tables/test_df1') \
.selectExpr(
"target_id AS df1_target_id",
"df1_timestamp AS df1_timestamp",
"item1 AS df1_item1",
"item2 AS df1_item2"
) \
.withWatermark( "df1_timestamp", "30 minutes" ) \
test_df2_stream = spark.readStream.format('delta').load('/data/tables/test_df2') \
.selectExpr(
"target_id AS df2_target_id",
"df2_timestamp AS df2_timestamp",
"item1 AS df2_item1",
"item2 AS df2_item2"
) \
.withWatermark( "df2_timestamp", "30 minutes" ) \
test_df_join_stream = test_df1_stream \
.join(
test_df2_stream,
F.expr("""
df1_item1 = df2_item1 AND
df1_item2 = df2_item2 AND
df2_timestamp >= df1_timestamp AND
df2_timestamp <= df1_timestamp + interval 1 hour
"""),
how='leftOuter'
) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "/data/tables/test_df_join_stream/_checkpoints/streaming-agg") \
.queryName("test_df_join_stream") \
.start("/data/tables/test_df_join_stream")
结果是:
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|df1_target_id|df1_timestamp |df1_item1|df1_item2|df2_target_id|df2_timestamp |df2_item1|df2_item2|
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|111111 |2020-09-03 06:23:33.651641|BlaBla1 |126 |999999 |2020-09-03 06:23:46.3197|BlaBla1 |126 |
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
这还不算太糟,但我想要的实际上更像这样:
+--------------------------+--------+------+----------+
|timestamp |item1 |item2 |target_id |
+--------------------------+--------+------+----------+
|2020-09-03 06:23:33.651641|BlaBla1 |126 |111111 |
|2020-09-03 06:23:46.3197 |BlaBla1 |126 |999999 |
+--------------------------+--------+------+----------+
这两条流将基于 item1
, item2
以及 target_id
作为两个流的键。有什么好办法吗?
谢谢你的帮助!
暂无答案!
目前还没有任何答案,快来回答吧!