Spark结构化流(Pyspark)中的多个writeStream

6ojccjat  于 2022-12-19  发布在  Apache
关注(0)|答案(1)|浏览(167)

我已经成功地在Pyspark中实现了一个writeStream,但是一旦我添加了第二个writeStream,只有第一个writeStream被打印到控制台。

import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.scheduler.allocation.file", "file:///opt/spark/conf/fairscheduler.xml")

spark = SparkSession \
    .builder \
    .appName("SparkStreaming") \
    .config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")

schema = StructType([ 
    StructField("text", StringType(), True),
    StructField("created_at" , TimestampType(), True)
    ])

tweets_df1 = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load() \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("tmp")).select("tmp.*")

q1 = tweets_df1 \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk1") \
    .trigger(processingTime='5 seconds') \
    .start()

q2 = tweets_df1 \
    .withColumn("foo", F.lit("foo")) \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk2") \
    .trigger(processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

下面是我的输出:

-------------------------------------------
Batch: 0
-------------------------------------------
-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+
|text|created_at|
+----+----------+
+----+----------+

+----+----------+---+
|text|created_at|foo|
+----+----------+---+
+----+----------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-------------------+
|                text|         created_at|
+--------------------+-------------------+
|Qatar posting for...|2022-12-16 20:23:06|
+--------------------+-------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-------------------+
|                text|         created_at|
+--------------------+-------------------+
|Who will win this...|2022-12-16 20:23:13|
+--------------------+-------------------+

列为foo的 Dataframe 在批处理0后停止-这意味着第二个writeStream没有运行。我可以通过每个writeStream的检查点文件夹来确认这一点。这个问题的大多数解决方案都在Scala中,我尝试将它们转换为Pyspark。
这只是在Pyspark不可能实现的事情吗?

kx1ctssn

kx1ctssn1#

最有可能发生这种情况是因为你只能从socket消费一次,所以其中一个流是“获胜”的。如果你想有多个消费者,考虑把你的消息放入一些“持久”的东西,例如,放入Kafka -这样每个流就可以彼此独立地消费消息。

相关问题