即使在多次尝试之后,我也无法在控制台上看到任何消息,也无法写入文件。
在我的代码下面:
df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers","ipaddress:9092")\
.option("subscribe","mysql-server-1.inventory.customers")\
.option("partition.assignment.strategy", "range") \
.option("startingOffsets", "latest")\
.load()#.
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(df.printSchema())
print("Streaming : {}".format(df.isStreaming))
ds = df.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "ipaddress:9092")\
.option("topic", "mysql-server-1.inventory.customers")\
.option("checkpointLocation", "hdfs://ipaddress:9000/user/xxxx/check")\
.start()
ds.awaitTermination()
注:
writestream中的主题已尝试使用其他名称
writestream上的格式已尝试使用带有append、update和complete的控制台。也已尝试用Parquet地板写入文件
打印时ds.lastprogress()get nonetype
暂无答案!
目前还没有任何答案,快来回答吧!