将Spark dStream与变量合并以保存到Cassandra()中

oymdgrw7  于 2022-11-05  发布在  Cassandra
关注(0)|答案(3)|浏览(130)

我有一个DStream[String, Int],它有几对单词计数,例如("hello" -> 10)。我想把这些计数用一个步进索引写入cassandra。这个索引初始化为var step = 1,并且随着每个微批处理的进行而递增。
将cassandra表创建为:

CREATE TABLE wordcounts (
    step int,
    word text,
    count int,
primary key (step, word)
);

尝试将流写入表时...

stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))

......我得到java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step
如何将step索引预先挂接到流中,以便将这三列一起写入?
我使用的是Spark2.0.0,斯卡拉2.11.8,Cassandra3.4.0和SparkCassandra连接器2.0.0-M3。

qnzebej0

qnzebej01#

如前所述,虽然Cassandra表需要(Int, String, Int)形式的内容,但wordCount DStream的类型是DStream[(String, Int)],因此要使对saveToCassandra(...)的调用正常工作,我们需要一个DStream[(Int, String, Int)]类型的DStream
这个问题中比较棘手的部分是如何将本地计数器(根据定义,只有在驱动程序中才知道)提升到DStream级别。
要做到这一点,我们需要做两件事:将计数器“提升”到分布式级别(在Spark中,我们指的是“RDD”或“DataFrame”),并将该值与现有的DStream数据连接起来。
从经典的流字计数示例开始:

// Split each line into words
val words = lines.flatMap(_.split(" "))

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

我们添加一个本地变量来保存微批处理的计数:

@transient var batchCount = 0

它被声明为transient,这样当我们声明使用它的转换时,Spark就不会试图关闭它的值。
现在来点棘手的:在DStream transform操作的上下文中,我们从该单个var表中生成RDD,并使用笛卡尔积将其与DStream的底层RDD连接:

val batchWordCounts = wordCounts.transform{ rdd => 
  batchCount = batchCount + 1

  val localCount = sparkContext.parallelize(Seq(batchCount))
  rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}

(Note一个简单的map函数将不起作用,因为只有var iable的初始值将被捕获和序列化。因此,在查看DStream数据时,看起来计数器从未增加。
最后,现在数据的形状已经正确,请将其保存到Cassandra:

batchWordCounts.saveToCassandra("keyspace", "wordcounts")
ee7vknir

ee7vknir2#

updateStateByKey函数由spark提供,用于全局状态处理。

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount: Int = runningCount.getOrElse(0) + 1
    Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)

stream.join(step).map{case (key,(count, step)) => (step,key,count)})
   .saveToCassandra("keyspace", "wordcounts")
eeq64g8w

eeq64g8w3#

因为您试图将RDD保存到现有的Cassandra表中,所以您需要在RDD中包含所有主键列值。
您可以使用以下方法将RDD保存到新表中。

saveAsCassandraTable or saveAsCassandraTableEx

有关更多信息,请查看this

相关问题