我想用kafka源代码在databricks中创建一个结构化流。我按照这里的说明做了。我的脚本似乎开始了,但是我无法在databricks笔记本中打印/输出一些东西。当我使用时,流itsellf工作良好,产生结果并工作(在databricks中) confluent_kafka
因此,我似乎遗漏了另一个问题:
脚本似乎“卡在”运行命令“/”流初始化“。
非常感谢您的任何意见!
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Define a data schema
schema = StructType() \
.add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
.add('ID', StringType())\
.add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
.add('TIMESTAMP', TimestampType())
df = spark \
.readStream \
.format("kafka") \
.option("host", "stream.xxx.com") \
.option("port", 12345)\
.option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
.option('subscribe', 'stream_test.json') \
.option("startingOffset", "earliest") \
.load()
df_word = df.select(F.col('key').cast('string'),
F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
# Group by id and count
df_group = df_word.select('parsed_value.*')\
.groupBy('ID').count()
query = df_group \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
我的流输出数据如下所示:
"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"
澄清一下:我正在尝试打印 query
到笔记本上测试连接。此单元格后面或前面没有单元格。
谢谢,注意安全。
暂无答案!
目前还没有任何答案,快来回答吧!