Pyspark:AnalysisException:必须使用writeStream.start()执行与流源的连接;

mpgws1up  于 2023-11-16  发布在  Spark
关注(0)|答案(2)|浏览(121)

我有多个函数,我想运行,但我得到一个错误,说我需要使用.start()。有人能告诉我如何继续吗?这里是我的代码的一部分:

def parse_data(df):
    cachedDf = df.cache()
    # Do some more logics 
    return df.collect()

def convert_to_local_time(df):
    # Logic for converting to localtime
    return df
    

raw_stream_df = spark.readStream.format("delta").load(raw_table_path)

df = parse_data(raw_stream_df)
df = convert_to_local_time(df)

字符串

vd2z7a6w

vd2z7a6w1#

这里有一个如何正确进行结构化流的工作示例。

from pyspark.sql import SparkSession
from pyspark import SQLContext, SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import *

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [

[1, 35,"Male","2023-10-01",200],
[1, 35,"Male","2023-10-02",210],
[2, 28,"Female","2023-10-01",150],
[2, 28,"Female","2023-10-02",160],

]

columns =["member_id", "age", "gender", "date", "cost"]

df1 = sqlContext.createDataFrame(data=data1, schema=columns)

df1.show(n=10, truncate=False)


tempdir = "../data/structstream/"
df1.write.format("csv").mode("overwrite").save(tempdir)

dataframeSchema = StructType([
                StructField("member_id",IntegerType(),False),
                StructField("age",IntegerType(),False),
                StructField("gender",StringType(),False),
                StructField("date",StringType(),False),
                StructField("cost",IntegerType(),False),

            ])

streaming_df = sqlContext.readStream \
    .schema(dataframeSchema) \
    .csv(tempdir)

transformed_stream = streaming_df.withColumn("date_casted", F.to_date(F.col("date"))) \
                            .withColumn("something_here", F.lit("hello_world"))

query = transformed_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

字符串
输出量:

+---------+---+------+----------+----+
|member_id|age|gender|date      |cost|
+---------+---+------+----------+----+
|1        |35 |Male  |2023-10-01|200 |
|1        |35 |Male  |2023-10-02|210 |
|2        |28 |Female|2023-10-01|150 |
|2        |28 |Female|2023-10-02|160 |
+---------+---+------+----------+----+

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+---+------+----------+----+-----------+--------------+
|member_id|age|gender|      date|cost|date_casted|something_here|
+---------+---+------+----------+----+-----------+--------------+
|        1| 35|  Male|2023-10-01| 200| 2023-10-01|   hello_world|
|        1| 35|  Male|2023-10-02| 210| 2023-10-02|   hello_world|
|        2| 28|Female|2023-10-01| 150| 2023-10-01|   hello_world|
|        2| 28|Female|2023-10-02| 160| 2023-10-02|   hello_world|
+---------+---+------+----------+----+-----------+--------------+

qco9c6ql

qco9c6ql2#

这并不是使用流式 Dataframe 的预期方式。您使用.format("delta").load(...)正确创建了流式 Dataframe ,但此时,您确实希望执行SQL操作,如过滤,投影,聚合等。您可以在这里找到一些示例。
具体地说,与其将raw_stream_df一个接一个地传递给不同的函数,你可以这样做:

# Transform your raw DataFrame to get another DataFrame
df = raw_stream_df.filter("foo > 5").groupBy("bar").count()

# Create a DataStreamWriter by specifying the sink format
data_stream_writer = df.writeStream.format("delta")

# Start your query
data_stream_writer.start()

字符串
总的来说,编写查询的基本步骤应该是:
1.使用源代码创建DataFrame(您已经走到这里了)
1.使用结构化流API转换 Dataframe
1.使用.writeStream告知结构化流有关接收器的信息
1.从.start()开始查询
请注意,.start()将在后台启动查询,因此即使在流运行时,该方法也将返回。

相关问题