我试图用spark1.0在hbase(0.96.0-hadoop2)中编写一些简单的数据,但是我不断遇到序列化问题。以下是相关代码:
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put
object PutRawDataIntoHbase{
def main(args: Array[String]): Unit = {
var propFileName = "hbaseConfig.properties"
if(args.size > 0){
propFileName = args(0)
}
/**Load properties here**/
val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
.map(l => l.split("\t"))
.map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))
val tableName = prop.getProperty("hbase.table.name")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
val myTable = new HTable(hbaseConf, tableName)
theData.foreach(a=>{
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
})
}
}
运行代码会导致:
Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable
用map替换foreach不会崩溃,但我也不会写。任何帮助都将不胜感激。
1条答案
按热度按时间dy1byipe1#
班级
HBaseConfiguration
表示到hbase服务器的连接池。显然,它不能被序列化并发送到工作节点。自HTable
使用此池与hbase服务器进行通信,它也不能序列化。基本上,有三种方法来处理这个问题:
在每个工作节点上打开连接。
注意使用
foreachPartition
方法:请注意,每个工作节点都必须能够访问hbase服务器,并且必须预先安装或通过提供所需的jar
ADD_JARS
.还要注意的是,由于连接池是为每个分区打开的,所以最好将分区的数量大致减少到工作节点的数量(使用
coalesce
函数)。也可以共享一个HTable
在每个工作节点上都有一个示例,但并不是那么简单。将所有数据序列化到单个框并将其写入hbase
可以用一台计算机从rdd中写入所有数据,即使这些数据不适合内存。详细说明如下:spark:从rdd到本地机器检索大数据的最佳实践
当然,它比分布式编写要慢,但它很简单,不会带来痛苦的序列化问题,如果数据大小合理,它可能是最好的方法。
使用hadoopoutputformat
可以为hbase创建自定义hadoopoutputformat或使用现有的hadoopoutputformat。我不确定是否有什么东西适合你的需要,但谷歌应该在这里提供帮助。
p、 顺便说一下
map
调用不会崩溃,因为它不会被计算:RDD不会被计算,直到你调用一个有副作用的函数。例如,如果你打电话theData.map(....).persist
,它也会崩溃。