在pyspark中以complete模式连接流和静态Dataframe

zzlelutf  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(360)

我有两个Dataframe,一个是使用spark结构化流传输的Dataframe,另一个是我创建的静态Dataframe。我想加入他们。
但在我尝试过的每一种方法中,我都得到了这样一个错误:
“当流Dataframe/数据集上没有流聚合时,不支持完整输出模式”

raw_s = spark.readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers", "...")\
          .option("subscribe", "06-02-2020, 07-02-2020, 08-02-2020, 09-02-2020, 10-02-2020, 11-02-2020, 12-02-2020, 13-02-2020, 14-02-2020, 15-02-2020, 16-02-2020, 17-02-2020, 18-02-2020, 19-02-2020, 20-02-2020, 21-02-2020, 22-02-2020, 23-02-2020, 24-02-2020, 25-02-2020, 26-02-2020, 27-02-2020, 28-02-2020, 29-02-2020, 01-03-2020, 02-03-2020, 03-03-2020, 04-03-2020, 05-03-2020, 06-03-2020, 07-03-2020, 08-03-2020, 09-03-2020, 10-03-2020, 11-03-2020, 12-03-2020, 13-03-2020, 14-03-2020, 15-03-2020, 16-03-2020, 17-03-2020, 18-03-2020, 19-03-2020, 20-03-2020, 21-03-2020, 22-03-2020, 23-03-2020, 24-03-2020, 25-03-2020, 26-03-2020, 27-03-2020, 28-03-2020, 29-03-2020, 30-03-2020, 31-03-2020, 01-04-2020, 02-04-2020, 03-04-2020, 04-04-2020, 05-04-2020, 06-04-2020, 07-04-2020, 08-04-2020, 09-04-2020, 10-04-2020, 11-04-2020, 12-04-2020, 13-04-2020, 14-04-2020, 15-04-2020, 16-04-2020, 17-04-2020, 18-04-2020, 19-04-2020, 20-04-2020, 21-04-2020, 22-04-2020, 23-04-2020, 24-04-2020, 25-04-2020, 26-04-2020, 27-04-2020, 28-04-2020, 29-04-2020, 30-04-2020, 01-05-2020, 02-05-2020, 03-05-2020, 04-05-2020, 05-05-2020, 06-05-2020, 07-05-2020, 08-05-2020, 09-05-2020, 10-05-2020, 11-05-2020, 12-05-2020, 13-05-2020, 14-05-2020, 15-05-2020, 16-05-2020, 17-05-2020, 18-05-2020, 19-05-2020, 20-05-2020, 21-05-2020, 22-05-2020, 23-05-2020, 24-05-2020, 25-05-2020, 26-05-2020, 27-05-2020, 28-05-2020, 29-05-2020, 30-05-2020, 31-05-2020, 01-06-2020, 02-06-2020, 03-06-2020, 04-06-2020, 05-06-2020, 06-06-2020, 07-06-2020, 08-06-2020, 09-06-2020")\
          .option("startingOffsets", "earliest")\
          .load()

string_val = raw_s.selectExpr("CAST(value AS STRING)")
the static data frame is:
extra:pyspark.sql.dataframe.DataFrame
state:string
date_only:string
new_cases:double
deaths:double

the streamed one:
tweet_id:long
user_id:long
date:string
keywords:array
  element:string
location:map
  key:string
  value:string
date_tmp:timestamp
date_only:string
country:string
state:string
city:string

这是我的密码:

import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime

# Define the schema of the data:

schema = StructType()\
          .add("tweet_id", LongType(), False)\
          .add("user_id", LongType(), False)\
          .add("date", StringType(), True)\
          .add("keywords", ArrayType(StringType(), True), True)\
          .add("location", MapType(StringType(), StringType(), True), True)

# Read each 'value' String as JSON:

json_df = string_val.select(F.from_json(F.col("value"), schema= schema).alias('json'))

# Flatten the nested object:

streaming_df = json_df.select("json.*")
streaming_df = streaming_df.withColumn("date_tmp",F.to_timestamp(F.col('date'), "EEE MMM dd HH:mm:ss ZZZZ yyyy"))
streaming_df = streaming_df.withColumn("date_only", F.from_unixtime(F.unix_timestamp(streaming_df.date_tmp), "MM-dd-yyyy"))
streaming_df = streaming_df.withColumn("country", streaming_df.location.country)
streaming_df = streaming_df.withColumn("state", streaming_df.location.state)
streaming_df = streaming_df.withColumn("city", streaming_df.location.city)
streaming_df = streaming_df.where(streaming_df.country == 'United States')
streaming_df = streaming_df.where(streaming_df.state.isNotNull())
join_df = streaming_df.join(extra, ['date_only','state'],'inner')
display(join_df)

output:
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
csga3l58

csga3l581#

从手册上https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html:
完成模式-整个结果表将在每次触发后输出到接收器。聚合查询(仅)支持此操作。
尝试附加模式或添加聚合。
但是当你有一个连接时,你需要附加模式。对于连接:
联接尚不支持更新和完成模式。Spark2.4.5。
完成模式不会删除旧的聚合状态。会导致oom错误。
你需要尝试这样的方法:

join_df.writeStream \
    .format("console")  \
    .outputMode("append") \
    .start() \
    .awaitTermination()

作为 display ,如果内存服务正确 complete 作为模式。

相关问题