我有多个函数,我想运行,但我得到一个错误,说我需要使用.start()
。有人能告诉我如何继续吗?这里是我的代码的一部分:
def parse_data(df):
cachedDf = df.cache()
# Do some more logics
return df.collect()
def convert_to_local_time(df):
# Logic for converting to localtime
return df
raw_stream_df = spark.readStream.format("delta").load(raw_table_path)
df = parse_data(raw_stream_df)
df = convert_to_local_time(df)
字符串
2条答案
按热度按时间vd2z7a6w1#
这里有一个如何正确进行结构化流的工作示例。
字符串
输出量:
型
qco9c6ql2#
这并不是使用流式 Dataframe 的预期方式。您使用
.format("delta").load(...)
正确创建了流式 Dataframe ,但此时,您确实希望执行SQL操作,如过滤,投影,聚合等。您可以在这里找到一些示例。具体地说,与其将
raw_stream_df
一个接一个地传递给不同的函数,你可以这样做:字符串
总的来说,编写查询的基本步骤应该是:
1.使用源代码创建DataFrame(您已经走到这里了)
1.使用结构化流API转换 Dataframe
1.使用
.writeStream
告知结构化流有关接收器的信息1.从
.start()
开始查询请注意,
.start()
将在后台启动查询,因此即使在流运行时,该方法也将返回。