在pyspark结构化流媒体中,如何在写入kafka之前丢弃已经生成的输出?

dm7nw8vv  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(281)

我正在尝试对kafka源数据执行结构化流(spark2.4.0),在那里我正在读取最新的数据,并在10分钟的窗口中执行聚合。我在写数据时使用“更新”模式。
例如,数据模式如下: tx_id, cust_id, product, timestamp 我的目标是找到在过去10分钟内购买了3种以上产品的客户。假设prod是从kafka读取的Dataframe,那么加窗的df定义为:

windowed_df_1 = prod.groupBy(window("timestamp", "10 minutes"), cust_id).count()
windowed_df = windowed_df_1.filter(col("count")>=3)

然后我将它与配置单元表“customer\u master”中的主Dataframe连接起来,以获取 cust_name : final_df = windowed_df.join(customer_master, "cust_id") 最后,将这个Dataframe写入kafka sink(为了简单起见,可以使用console)

query = final_df.writeStream.outputMode("update").format("console").option("truncate",False).trigger(processingTime='2 minutes').start()
query.awaitTermination()

现在,当这段代码每2分钟运行一次时,在随后的运行中,我想丢弃那些已经是我输出的一部分的客户。我不想让他们出现在我的产品中,即使他们再购买任何产品。
我可以将流输出临时写入某个位置(可能是一个配置单元表)并对每次执行执行执行“反连接”吗?这样,我还可以在配置单元表中维护历史记录。
我还读到,我们可以将输出写入内存接收器,然后使用 df.write 将其保存在hdfs/hive中。但是如果我们终止作业并重新运行呢?在这种情况下,内存表可能会丢失。
请帮助我,因为我是新的结构化流媒体。

**

更新:-

**我还尝试在配置单元表和控制台(或kafka接收器)中编写以下代码:

def write_to_hive(df, epoch_id):
    df.persist()
    df.write.format("hive").mode("append").saveAsTable("hive_tab_name")
    pass

final_df.writeStream.outputMode("update").format("console").option("truncate", False).start()

final_df.writeStream.outputMode("update").foreachBatch(write_to_hive).start()

但这只执行第一个操作,即写入控制台。如果我先写“foreachbatch”,它将保存到配置单元表,但不会打印到控制台。
我想写两个不同的Flume。请帮忙。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题