如何为单元测试生成rdd[结果]

dgiusagp  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(339)

出于单元测试的目的,我正在构建自己的hbase结果对象,如下所示

val row = Bytes.toBytes( "row01" )
val cf = Bytes.toBytes( "cf" )
val cell1 = new KeyValue( row, cf, "v1".getBytes(), Bytes.toBytes( "file1" ) )
val cell2 = new KeyValue( row2, cf, "v2".getBytes(), Bytes.toBytes( "file2" ) )

val cells = List( cell1, cell2 )

val result = Result.create( cells )

现在我想把它添加到一个sparkcontext对象中,比如

val sparkContext = new org.apache.spark.SparkContext( conf )
val rdd = sparkContext.parallelize( List( result ) )

但是,一旦我尝试通过foreach访问rdd,比如

rdd.foreach{x=>x}

我得到了著名的Spark任务不能序列化。
有没有人知道一个更好的方法来解决这个问题?

ygya80vv

ygya80vv1#

Result 是不可序列化的,所以如果您想要 RDD[Result] 你必须生产 Result 从其他输入(当然,还有 collect , first 哪一个会 Result 在节点之间等将不起作用)。例如。

val rdd0 = sparkContext.parallelize( List( ("row", "cf") ) )

val rdd = rdd.map { case (str1, str2) =>
  val row = Bytes.toBytes( str1 )
  val cf = Bytes.toBytes( str2 )
  val cell1 = new KeyValue( row, cf, "v1".getBytes(), Bytes.toBytes( "file1" ) )
  val cell2 = new KeyValue( row2, cf, "v2".getBytes(), Bytes.toBytes( "file2" ) )

  val cells = List( cell1, cell2 )

  Result.create( cells )
}

相关问题