我已经成功地在Pyspark中实现了一个writeStream
,但是一旦我添加了第二个writeStream
,只有第一个writeStream
被打印到控制台。
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
conf.set("spark.scheduler.allocation.file", "file:///opt/spark/conf/fairscheduler.xml")
spark = SparkSession \
.builder \
.appName("SparkStreaming") \
.config(conf=conf) \
.getOrCreate()
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
schema = StructType([
StructField("text", StringType(), True),
StructField("created_at" , TimestampType(), True)
])
tweets_df1 = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 9999) \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("tmp")).select("tmp.*")
q1 = tweets_df1 \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk1") \
.trigger(processingTime='5 seconds') \
.start()
q2 = tweets_df1 \
.withColumn("foo", F.lit("foo")) \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk2") \
.trigger(processingTime='5 seconds') \
.start()
spark.streams.awaitAnyTermination()
下面是我的输出:
-------------------------------------------
Batch: 0
-------------------------------------------
-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+
|text|created_at|
+----+----------+
+----+----------+
+----+----------+---+
|text|created_at|foo|
+----+----------+---+
+----+----------+---+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-------------------+
| text| created_at|
+--------------------+-------------------+
|Qatar posting for...|2022-12-16 20:23:06|
+--------------------+-------------------+
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-------------------+
| text| created_at|
+--------------------+-------------------+
|Who will win this...|2022-12-16 20:23:13|
+--------------------+-------------------+
列为foo
的 Dataframe 在批处理0后停止-这意味着第二个writeStream
没有运行。我可以通过每个writeStream
的检查点文件夹来确认这一点。这个问题的大多数解决方案都在Scala中,我尝试将它们转换为Pyspark。
这只是在Pyspark不可能实现的事情吗?
1条答案
按热度按时间kx1ctssn1#
最有可能发生这种情况是因为你只能从socket消费一次,所以其中一个流是“获胜”的。如果你想有多个消费者,考虑把你的消息放入一些“持久”的东西,例如,放入Kafka -这样每个流就可以彼此独立地消费消息。