将spark streaming pyspark dataframe写入cassandra将覆盖表而不是追加

db2dz4w8  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(455)

我在运行一个由Kafka,斯帕克和Cassandra组成的单节点集群。都在同一台机器上。
从一个简单的python脚本中,我每5秒将一些伪数据流到一个kafka主题中。然后使用spark结构化流,我将这个数据流(一次一行)读入一个pysparkDataframe startingOffset = latest . 最后,我尝试将这一行附加到已经存在的cassandra表中。
我一直在关注(如何将流数据集写入cassandra?)和(kafka主题中的用于pyspark结构化流的cassandrasink)。
一行数据被成功地写入到cassandra表中,但我的问题是它每次都被覆盖,而不是追加到表的末尾。我可能做错了什么?
这是我的密码:
用于创建的cql ddl kafkaspark 键空间后跟 randintstream Cassandra表格:

DESCRIBE keyspaces;

CREATE KEYSPACE kafkaspark
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };

USE kafkaspark; 

CREATE TABLE randIntStream (
    key int,
    value int,
    topic text,
    partition int,
    offset bigint,
    timestamp timestamp,
    timestampType int,
    PRIMARY KEY (partition, topic)
);

发射pyspark炮弹

./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

将来自kafka主题的最新消息读入流式Dataframe:

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()

一些转换和检查模式:

df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()

写入cassandra的函数:

def writeToCassandra(writeDF, epochId):
    writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="randintstream", keyspace="kafkaspark") \
    .mode("append") \
    .save()

最后,查询如何从spark向cassandra写信:

query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
``` `SELECT *` 在Cassandra的table上:
![](https://i.stack.imgur.com/XvdHn.png)
cyvaqqii

cyvaqqii1#

如果行总是用cassandra重写,那么表中的主键可能不正确-您需要确保每一行都有一个唯一的主键。如果您是从spark创建cassandra表,那么默认情况下,它只接受第一列作为分区键,并且它本身可能不是唯一的。
提供架构后更新:
是的,我指的就是这种情况-你有一个 (partition, topic) ,但从该主题中读取的特定分区中的每一行的主键值都相同,因此它将覆盖以前的版本。您需要使主键唯一—例如,添加 offset 或者 timestamp 列到主键(尽管 timestamp 如果在同一毫秒内生成数据,则可能不是唯一的。
p、 另外,在连接器3.0.0中,您不需要 foreachBatch :

df4.writeStream \
  .trigger(processingTime="5 seconds") \
  .format("org.apache.spark.sql.cassandra") \
  .options(table="randintstream", keyspace="kafkaspark") \
  .mode("update") \
  .start()

p、 另外,如果你只是想把数据从Kafka转移到Cassandra,你可以考虑使用datastax的Kafka连接器,与spark相比,它可能要轻得多。

相关问题