spark到cassandra:将没有空值的稀疏行写入cassandra

nzkunb0c  于 2021-06-15  发布在  Cassandra
关注(0)|答案(1)|浏览(514)

问:如何只将带有sparkDataframe中的值的列写入cassanrda并有效地执行此操作(高效地使用最少的scala代码行,而不是在cassandra中创建一堆墓碑,让它快速运行,等等)
我有一个cassandra表,有两个键列和300个潜在的描述符值。

create table sample {
    key1   text,
    key2   text,
    0      text,
    ............
    299    text,
    PRIMARY KEY (key1, key2)
}

我有一个与基础表匹配的spark dataframe,但是dataframe中的每一行都非常稀疏-除了两个键值之外,一个特定的行可能只有4到5个带有值的“描述符”(列0->299)。
我目前正在将sparkDataframe转换为rdd,并使用saverdd来写入数据。
这是可行的,但是当没有值时,“null”存储在列中。
例如:

val saveRdd = sample.rdd

  saveRdd.map(line => (
    line(0), line(1), line(2),
    line(3), line(4), line(5),
    line(6), line(7), line(8),
    line(9), line(10), line(11),
    line(12), line(13), line(14),
    line(15), line(16), line(17),
    line(18), line(19), line(20))).saveToCassandra..........

在cassandra中创建:
该地区的xz;10 |;10;10 |美国;10 |美国| 10;4984849空|空|空|空|空|空|空|空|空|空|空|零点|空|空|空|空|空|空|空|空|空|空|零点| |零|空|空|空|空|空|空|空|空|空|零点|空|空|空|空|空|空|空|空|空|空|零| 124;空| 0 |空|空|空|空|空|空|空|空||零点| |空|空|空|位置|空|空|空|空|空|空|空|空|空|空
在sparksession上设置spark.cassandra.output.ignorenulls不起作用:

spark.conf.set("spark.cassandra.output.ignoreNulls", "true")
spark.conf.get("spark.cassandra.output.ignoreNulls")

这也不起作用:

spark-shell  --conf spark.cassandra.output.ignoreNulls=true

(尝试了不同的设置方法,但我的设置方式似乎不起作用) withColumn 而过滤器似乎不是合适的解决方案。一个未设置的概念可能是正确的,但不确定如何在这种情况下使用它。
Cassandra3.11.2
SparkCassandra-connector:2.3.0-s_2.11
Spark2.2.0.2.6.3.0-235
谢谢您!

dw1jzc5e

dw1jzc5e1#

你确定吗 ignoreNulls 不适合你?Cassandra输出 null 当给定单元格中没有值时。您可以使用 sstabledump 工具-您肯定会看到附加了删除信息的单元格(这就是空值的存储方式)。
下面是运行spark的例子 ignoreNulls (默认),并带有 ignoreNulls 设置为 true . 测试是在dse 5.1.11上完成的,它有旧版本的连接器,但与cassandra 3.11相匹配。
创建如下测试表:

create table test.t3 (id int primary key, t1 text, t2 text, t3 text);

没有 ignoreNulls -我们需要以下代码进行测试:

case class T3(id: Int, t1: Option[String], t2: Option[String], t3: Option[String])
val rdd = sc.parallelize(Seq(new T3(1, None, Some("t2"), None)))
rdd.saveToCassandra("test", "t3")

如果我们使用 cqlsh 我们将看到以下内容:

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

(1 rows)

做完之后 nodetool flush 我们可以看看table。我们将在这里看到:

>sstabledump mc-1-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 30,
        "liveness_info" : { "tstamp" : "2018-11-06T07:53:38.418171Z" },
        "cells" : [
          { "name" : "t1", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          },
          { "name" : "t2", "value" : "t2" },
          { "name" : "t3", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          }
        ]
      }
    ]
  }
]

你可以在列中看到 t1 & t3 有一个字段 deletion_info .
现在,让我们用 TRUNCATE test.t3 ,重新启动spark shell ignoreNulls 设置为真:

dse spark --conf spark.cassandra.output.ignoreNulls=true

在执行相同的spark代码之后,我们将在 cqlsh :

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

但在执行冲洗后 sstabledump 显示完全不同的图片:

>sstabledump mc-3-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 27,
        "liveness_info" : { "tstamp" : "2018-11-06T07:56:27.035600Z" },
        "cells" : [
          { "name" : "t2", "value" : "t2" }
        ]
      }
    ]
  }
]

如您所见,我们只有列的数据 t2 ,没有提到列 t3 & t1 都是空的。

相关问题