我正在尝试在解析一些文本后执行hbase的插入,代码运行良好,但我认为可以对其进行组织以获得更好的性能。在下面的im打开一个循环中的连接,我想知道如何才能打开一个连接,并使用它的所有插入。我想我需要连接到一个函数才能实现这一点。
def extractInfo(fp:String) = {
val p:Parser = new AutoDetectParser()
val fs = FileSystem.get(new java.net.URI("XXXXXXXXXX"), new Configuration())
val inputPath:Path = new Path(fp)
val is:InputStream = fs.open(inputPath)
val handler:BodyContentHandler = new BodyContentHandler(-1)
val metadata:Metadata = new Metadata()
try{
p.parse(is, handler, metadata, new ParseContext())
is.close()
val hand = handler.toString()
val gson = new Gson
val jsonTree = gson.toJsonTree(metadata)
val metaNode = jsonTree.getAsJsonObject().getAsJsonObject("metadata")
val jsonString = gson.toJson(metaNode)
if (hand.trim().isEmpty()){
println("no Text extracted", inputPath)
} else {
println("Success")
}
val fname = "ABC"
val configuration: Configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.quorum", "XXXX")
configuration.set("hbase.zookeeper.property.clientPort", "XXXX")
configuration.set("zookeeper.znode.parent", "/hbase-XXX")
configuration.set("hbase.client.keyvalue.maxsize", "0")
val principal = System.getProperty("kerberosPrincipal", "XXXXX")
val keytabLocation = System.getProperty("kerberosKeytab", "XXXXXXXXX")
UserGroupInformation.setConfiguration(configuration)
UserGroupInformation.loginUserFromKeytab(principal, keytabLocation)
val connection = ConnectionFactory.createConnection(HBaseConfiguration.create(configuration))
val admin = connection.getAdmin
val hTable:HTable = new HTable(configuration, "XXXXXXXXX")
val g = new Put(Bytes.toBytes(fname))
g.add(Bytes.toBytes("txt"),Bytes.toBytes("text"),Bytes.toBytes(hand))
hTable.put(g)
val m = new Put(Bytes.toBytes(fname))
m.add(Bytes.toBytes("data"),Bytes.toBytes("info"),Bytes.toBytes(jsonString))
hTable.put(m)
hTable.close()
fs.close()
}
catch {
case e : Throwable => {
println(e.printStackTrace)
}
}
}
object App {
def main(args : Array[String]) {
val fnames = "/X/X/XXXXX.XXX"
fnames.foreach{x => extractInfo(x) }
}
}
1条答案
按热度按时间e0uiprwp1#
在spark中,如果要从executors而不是driver更新hbase,则为每个executor创建连接,这样连接将在同一个executor中重用。通过这种方式,您可以减少相同的连接创建时间开销,但可以为每个线程创建表对象,因为hbase表对象不是线程安全的(请查看hbase客户端官方文档)。
当然,一旦完成,就关闭表和连接。
=============如上面的代码示例所示=========