使用spark流从kafka获取Dataframe

sycxhyv7  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(297)

嗨,我是新来的。我有一个以窗口方式从Kafka主题中获取数据流的用例,然后在上面进行分析。我尝试了以下代码,但它引发了错误:

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("TestKakfa").getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "dummy_events") \
  .option("startingOffsets", "earliest") \
  .load()

query = df.writeStream.format('console').start()
query.awaitTermination()

我得到以下错误:

pyspark.sql.utils.StreamingQueryException: 'null\n=== Streaming Query ===\nIdentifier: [id = b842e3ba-8584-4764-8068-cce6b8d7de5f, runId = edc960c5-e5ef-4493-b9a2-c9543e1d2a8d]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread State: RUNNABLE'

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题