将两个数据流连接到一个具有合并列的表pyspark

3b6akqbq  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(233)

使用pyspark的结构化流处理,我尝试将两个数据流合并成一个具有左外连接的数据流,以检索两个数据流中的所有数据。
例如,我有一个数据模型:


# Schemas

test_df1_schema = StructType([
    StructField("item1", StringType(), True),
    StructField("item2", IntegerType(), True),
    StructField("target_id", LongType(), True),
    StructField("df1_timestamp", TimestampType(), True),
])

test_df2_schema = StructType([
    StructField("item1", StringType(), True),
    StructField("item2", IntegerType(), True),
    StructField("target_id", LongType(), True),
    StructField("df2_timestamp", TimestampType(), True),
])

# Initialize tables

test_df1 = spark.createDataFrame([
    ("BlaBla1",126,111111,datetime.now()),
    ], test_df1_schema) \
    .write \
    .format("delta") \
    .mode('overwrite') \
    .save("/data/tables/test_df1")

test_df2 = spark.createDataFrame([
    ("BlaBla1",126,999999,datetime.now()),
    ], test_df2_schema) \
    .write \
    .format("delta") \
    .mode('overwrite') \
    .save("/data/tables/test_df2")

这两个表如下所示:

+-------+------+---------+--------------------------+
|item1  |item2 |target_id|df1_timestamp             |
+-------+----------------+--------------------------+
|BlaBla1|126   |111111   |2020-09-03 05:54:55.103165|
+-------+------+---------+--------------------------+

+-------+------+---------+--------------------------+
|item1  |item2 |target_id|df2_timestamp             |
+-------+----------------+--------------------------+
|BlaBla1|126   |999999   |2020-09-03 05:55:02.848808|
+-------+------+---------+--------------------------+

在这里,我创建了两个流,我想使用left outer join合并它们,以从两边获取数据:


# Read and Join tables

test_df1_stream = spark.readStream.format('delta').load('/data/tables/test_df1') \
    .selectExpr( 
        "target_id AS df1_target_id",
        "df1_timestamp AS df1_timestamp",
        "item1 AS df1_item1", 
        "item2 AS df1_item2"
    ) \
    .withWatermark( "df1_timestamp", "30 minutes" ) \

test_df2_stream = spark.readStream.format('delta').load('/data/tables/test_df2') \
    .selectExpr( 
        "target_id AS df2_target_id",
        "df2_timestamp AS df2_timestamp",
        "item1 AS df2_item1", 
        "item2 AS df2_item2"
    ) \
    .withWatermark( "df2_timestamp", "30 minutes" ) \

test_df_join_stream = test_df1_stream \
    .join(
        test_df2_stream,
        F.expr("""
            df1_item1 = df2_item1 AND
            df1_item2 = df2_item2 AND
            df2_timestamp >= df1_timestamp AND
            df2_timestamp <= df1_timestamp + interval 1 hour
        """),
        how='leftOuter'
    ) \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/data/tables/test_df_join_stream/_checkpoints/streaming-agg") \
    .queryName("test_df_join_stream") \
    .start("/data/tables/test_df_join_stream")

结果是:

+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|df1_target_id|df1_timestamp             |df1_item1|df1_item2|df2_target_id|df2_timestamp           |df2_item1|df2_item2|
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+
|111111       |2020-09-03 06:23:33.651641|BlaBla1  |126      |999999       |2020-09-03 06:23:46.3197|BlaBla1  |126      |
+-------------+--------------------------+---------+---------+-------------+------------------------+---------+---------+

这还不算太糟,但我想要的实际上更像这样:

+--------------------------+--------+------+----------+
|timestamp                 |item1   |item2 |target_id |
+--------------------------+--------+------+----------+
|2020-09-03 06:23:33.651641|BlaBla1 |126   |111111    |   
|2020-09-03 06:23:46.3197  |BlaBla1 |126   |999999    |
+--------------------------+--------+------+----------+

这两条流将基于 item1 , item2 以及 target_id 作为两个流的键。有什么好办法吗?
谢谢你的帮助!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题