ApacheSpark—将流式PyparkDataframe附加到cassandra不会写入任何内容

93ze6v8z  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(256)

我在同一台机器上运行Kafka,斯帕克和Cassandra。
每隔1分钟,我将一些数据流式传输到kafka主题(key是unix epoch,单位是秒,value是cpu温度),并将最新的偏移量读取到pyspark流式Dataframe中。在一些基本的列转换之后,我尝试将这些数据附加到cassandra表的末尾。pyspark脚本似乎没有错误地执行,但是没有数据被写入cassandra。我可能做错了什么?
我一直在做以下工作:(i)(如何将流数据集写入cassandra?)(ii)(kafka主题中pyspark结构化流的cassandra接收器)和(iii)将spark流pysparkDataframe写入cassandra会覆盖表,而不是附加
cql ddl创建表

CREATE TABLE cputemperature (
    key bigint,
    value double,
    topic text,
    partition int,
    offset bigint,
    timestamp timestamp,
    timestampType int,
    PRIMARY KEY (value, offset)
)
WITH CLUSTERING ORDER BY (offset ASC);

Pypark脚本

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import sys

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
spark.sparkContext.setLogLevel('OFF')

def helper_spark_source_kafka():
    '''
    This function reads latest offset of Kafka stream into a PySpark streaming dataframe, performs
    column data type conversions and returns streaming dataframe.
    '''
    try:
        df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
        df2 = df1.withColumn("key", df1["key"].cast("string")).withColumn("value", df1["value"].cast("string"))
        df3 = df2.withColumn("key", df2["key"].cast("long")).withColumn("value", df2["value"].cast("double"))
        df4 = df3.withColumnRenamed("timestampType","timestamptype")
    except:
        print("Unexpected error:", sys.exc_info()[0])
        print("Unexpected error:", sys.exc_info()[1])
        print("Unexpected error:", sys.exc_info()[2])
        sys.exit(1)
    return df4

def helper_spark_sink_cassandra(write_df, epoch_id):
    '''
    This function defines the Cassandra sink for PySpark streaming dataframe.
    '''
    write_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="cputemperature", keyspace="kafkaspark")\
    .mode("append")

if __name__ == '__main__':
    streaming_df = helper_spark_source_kafka()

# query_console = streaming_df.writeStream.format("console")

# query_console.start().awaitTermination()

    query_cassandra = streaming_df.writeStream \
    .trigger(processingTime="60 seconds") \
    .outputMode("update") \
    .foreachBatch(helper_spark_sink_cassandra)
    query_cassandra.start().awaitTermination()

spark提交命令

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions $HOME/cputemplogs/pysparkscript.py

视频截图 SELECT * 在Cassandra桌上

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题