我想在kafka主题的顶部创建dataframe,然后我想将该dataframe注册为temp表,以便对数据执行减号操作。我写了下面的代码。但是在查询已注册的表时,出现错误“org.apache.spark.sql.analysisexception:必须使用writestream.start();”执行流源查询
org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "SERVER******").option("subscribe", "TOPIC_NAME").option("startingOffsets", "earliest").load()
df.printSchema()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))
val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")
personDF.registerTempTable("final_df1")
spark.sql("select * from final_df1").show
error:
3条答案
按热度按时间deyfvvtc1#
流式Dataframe不支持
show()
方法。当你打电话的时候start()
方法,它将启动一个后台线程将输入数据流式传输到接收器,并且由于您使用的是consolesink,它将数据输出到控制台。你不需要打电话show()
.拆下下面的线,
然后加上下面的行或相等的行,
niknxzdl2#
使用
memory
接收器而不是寄存器可清空。检查以下代码。wlzqhblo3#
“org.apache.spark.sql.analysisexception:必须使用writestream.start();”执行具有流源的查询
我还使用了start()方法,得到以下错误。
2011年8月20日00:59:30错误streaming.microbatchexecution:查询final_df1[id=1a3e2ea4-2ec1-42f8-a5eb-8a12ce0fb3f5,runid=7059f3d2-21ec-43c4-b55a-8c735272bf0f]终止,错误为java.lang.abstractmethoderror
注意:编写这个脚本的主要目的是对这个数据编写减号查询,并将它与集群上的一个寄存器表进行比较。所以,总结一下,如果我要从oracle数据库发送kafka主题中的1000条记录,我将在oracle表的顶部创建dataframe,将其注册为temp表,并对kafka主题执行相同的操作。然后我想在源(oracle)和目标(kafka主题)之间运行减号查询。在源和目标之间执行100%数据验证(是否可以将kafka主题注册为临时表?)