scala—在spark作业中写入hbase:存在类型的难题

ymzxtsji  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(355)

我正在尝试编写一个spark作业,将其输出放入hbase。据我所知,正确的方法是使用 saveAsHadoopDatasetorg.apache.spark.rdd.PairRDDFunctions -这就要求我的 RDD 是成对的。
方法 saveAsHadoopDataset 需要 JobConf ,这就是我试图构建的。根据这个链接,有一件事我必须在我的 JobConf 是输出格式(事实上没有它就不能工作),比如

jobConfig.setOutputFormat(classOf[TableOutputFormat])

问题是,显然这并不能编译,因为 TableOutputFormat 是泛型的,即使它忽略其类型参数。所以我尝试了各种组合,比如

jobConfig.setOutputFormat(classOf[TableOutputFormat[Unit]])
jobConfig.setOutputFormat(classOf[TableOutputFormat[_]])

但无论如何我都会出错

required: Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]

现在,据我所知, Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]] 转换为 Class[T] forSome { type T <: org.apache.hadoop.mapred.OutputFormat[_, _] } . 我觉得我有个问题,因为: Class 是不变的 TableOutputFormat[T] <: OutputFormat[T, Mutation] ,但是
我不确定存在类型如何与需求中的子类型交互 T <: OutputFormat[_, _] 有没有办法获得 OutputFormat[_, _]TableOutputFormat ? 这个问题似乎源于java和scala中泛型之间的差异——对此我能做些什么?
编辑:
原来这更微妙。我试图在repl中定义一个方法

def foo(x: Class[_ <: OutputFormat[_, _]]) = x

我可以用

foo(classOf[TableOutputFormat[Unit]])

甚至

foo(classOf[TableOutputFormat[_]])

因为那很重要。但我不能打电话

jobConf.setOutputFormat(classOf[TableOutputFormat[_]])

签名原件 setOutputFormat 在java中是 void setOutputFormat(Class<? extends OutputFormat> theClass) . 从斯卡拉怎么称呼?

lpwwtiir

lpwwtiir1#

import org.apache.hadoop.hbase.mapred.TableOutputFormat 如果已弃用,则可以使用以下代码作为草稿:

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
...
val hConf = HBaseConfiguration.create()

val job = Job.getInstance(hConf)
val jobConf = job.getConfiguration
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
...
rdd.saveAsNewAPIHadoopDataset(jobConf)
abithluo

abithluo2#

这很奇怪,您100%确定您的导入是正确的(编辑:是的,这是问题,请参阅注解),并且您的构建文件中有正确版本的人工制品吗?如果我提供我工作项目中的代码片段,或许可以帮助您:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat

val conf = HBaseConfiguration.create()

val jobConfig: JobConf = new JobConf(conf, this.getClass)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)

还有一些我有:

"org.apache.hadoop" % "hadoop-client" % "2.3.0-mr1-cdh5.0.0",
"org.apache.hbase" % "hbase-client" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-common" % "0.96.1.1-cdh5.0.0", 

"org.apache.hbase" % "hbase-hadoop-compat" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-it" % "0.96.1.1-cdh5.0.0", /
"org.apache.hbase" % "hbase-hadoop2-compat" % "0.96.1.1-cdh5.0.0",

"org.apache.hbase" % "hbase-prefix-tree" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-protocol" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-server" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-shell" % "0.96.1.1-cdh5.0.0", 

"org.apache.hbase" % "hbase-testing-util" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-thrift" % "0.96.1.1-cdh5.0.0",

相关问题