我在同一台机器上运行Kafka,斯帕克和Cassandra。
每隔1分钟,我将一些数据流式传输到kafka主题(key是unix epoch,单位是秒,value是cpu温度),并将最新的偏移量读取到pyspark流式Dataframe中。在一些基本的列转换之后,我尝试将这些数据附加到cassandra表的末尾。pyspark脚本似乎没有错误地执行,但是没有数据被写入cassandra。我可能做错了什么?
我一直在做以下工作:(i)(如何将流数据集写入cassandra?)(ii)(kafka主题中pyspark结构化流的cassandra接收器)和(iii)将spark流pysparkDataframe写入cassandra会覆盖表,而不是附加
cql ddl创建表
CREATE TABLE cputemperature (
key bigint,
value double,
topic text,
partition int,
offset bigint,
timestamp timestamp,
timestampType int,
PRIMARY KEY (value, offset)
)
WITH CLUSTERING ORDER BY (offset ASC);
Pypark脚本
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import sys
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
spark.sparkContext.setLogLevel('OFF')
def helper_spark_source_kafka():
'''
This function reads latest offset of Kafka stream into a PySpark streaming dataframe, performs
column data type conversions and returns streaming dataframe.
'''
try:
df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
df2 = df1.withColumn("key", df1["key"].cast("string")).withColumn("value", df1["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("long")).withColumn("value", df2["value"].cast("double"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
except:
print("Unexpected error:", sys.exc_info()[0])
print("Unexpected error:", sys.exc_info()[1])
print("Unexpected error:", sys.exc_info()[2])
sys.exit(1)
return df4
def helper_spark_sink_cassandra(write_df, epoch_id):
'''
This function defines the Cassandra sink for PySpark streaming dataframe.
'''
write_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="cputemperature", keyspace="kafkaspark")\
.mode("append")
if __name__ == '__main__':
streaming_df = helper_spark_source_kafka()
# query_console = streaming_df.writeStream.format("console")
# query_console.start().awaitTermination()
query_cassandra = streaming_df.writeStream \
.trigger(processingTime="60 seconds") \
.outputMode("update") \
.foreachBatch(helper_spark_sink_cassandra)
query_cassandra.start().awaitTermination()
spark提交命令
$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions $HOME/cputemplogs/pysparkscript.py
视频截图 SELECT *
在Cassandra桌上
暂无答案!
目前还没有任何答案,快来回答吧!