spark hbase插入太多打开的连接

omvjsjqw  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(273)

我正在尝试在解析一些文本后执行hbase的插入,代码运行良好,但我认为可以对其进行组织以获得更好的性能。在下面的im打开一个循环中的连接,我想知道如何才能打开一个连接,并使用它的所有插入。我想我需要连接到一个函数才能实现这一点。

def extractInfo(fp:String) = {
val p:Parser = new AutoDetectParser()
val fs = FileSystem.get(new java.net.URI("XXXXXXXXXX"), new Configuration())
val inputPath:Path = new Path(fp)
val is:InputStream = fs.open(inputPath)
val handler:BodyContentHandler = new BodyContentHandler(-1)
val metadata:Metadata = new Metadata()
try{
p.parse(is, handler, metadata, new ParseContext())
is.close()
val hand = handler.toString()
val gson = new Gson
val jsonTree = gson.toJsonTree(metadata)
val metaNode = jsonTree.getAsJsonObject().getAsJsonObject("metadata")
val jsonString = gson.toJson(metaNode)
if (hand.trim().isEmpty()){
println("no Text extracted", inputPath)
} else {
println("Success")
}
val fname = "ABC"
val configuration: Configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.quorum", "XXXX")
configuration.set("hbase.zookeeper.property.clientPort", "XXXX")
configuration.set("zookeeper.znode.parent", "/hbase-XXX") 
configuration.set("hbase.client.keyvalue.maxsize", "0")
val principal = System.getProperty("kerberosPrincipal", "XXXXX")
val keytabLocation = System.getProperty("kerberosKeytab", "XXXXXXXXX")
UserGroupInformation.setConfiguration(configuration)
UserGroupInformation.loginUserFromKeytab(principal, keytabLocation)
val connection = ConnectionFactory.createConnection(HBaseConfiguration.create(configuration))
val admin = connection.getAdmin
val hTable:HTable = new HTable(configuration, "XXXXXXXXX")
val g = new Put(Bytes.toBytes(fname))
g.add(Bytes.toBytes("txt"),Bytes.toBytes("text"),Bytes.toBytes(hand))
hTable.put(g)
val m = new Put(Bytes.toBytes(fname))
m.add(Bytes.toBytes("data"),Bytes.toBytes("info"),Bytes.toBytes(jsonString))
hTable.put(m)
hTable.close()
fs.close()
}
catch {
case e : Throwable => {
println(e.printStackTrace)
}
}
} 

object App {
def main(args : Array[String]) {
val fnames = "/X/X/XXXXX.XXX"
fnames.foreach{x => extractInfo(x) }
}
}
e0uiprwp

e0uiprwp1#

在spark中,如果要从executors而不是driver更新hbase,则为每个executor创建连接,这样连接将在同一个executor中重用。通过这种方式,您可以减少相同的连接创建时间开销,但可以为每个线程创建表对象,因为hbase表对象不是线程安全的(请查看hbase客户端官方文档)。
当然,一旦完成,就关闭表和连接。

// this object will make connection reside on executor side not Driver
// and serve as a singleton per executor JVM, which makes connection shared between executor threads, connection is thread-safe!
object HbaseHandler {
  var connection: Option[Connection] = None

  def put(put: Put): Unit = {
    if(connection.isEmpty) {
      val conn = ConnectionFactory.createConnection(HBaseConfiguration.create(configuration))

      connection = Some(conn)
    }

    connection.get.<do some stuff>
    val table = ...
    try {
      table.put(put)
    } finally {  
      table.close()
    }
  }
} 

...

rdd.foreach (
  row => {
    val put: Put = <generate put object>
    HbaseHandler.put(put)
  }
)

=============如上面的代码示例所示=========

object Hbase {
  private var hbaseConnection: Option[Connection] = None

  private def connection: Connection = {
    if(hbaseConnection.isEmpty) {
      hbaseConnection = Some(ConnectionFactory.createConnection(HBaseConfiguration.create(configuration)))
    }

    hbaseConnection.get
  }
  def extractInfo(fp: String) = {
    val p: Parser = new AutoDetectParser()
    val fs = FileSystem.get(new java.net.URI("XXXXXXXXXX"), new Configuration())
    val inputPath: Path = new Path(fp)
    val is: InputStream = fs.open(inputPath)
    val handler: BodyContentHandler = new BodyContentHandler(-1)
    val metadata: Metadata = new Metadata()
    try {
      p.parse(is, handler, metadata, new ParseContext())
      is.close()
      val hand = handler.toString()
      val gson = new Gson
      val jsonTree = gson.toJsonTree(metadata)
      val metaNode = jsonTree.getAsJsonObject().getAsJsonObject("metadata")
      val jsonString = gson.toJson(metaNode)
      if (hand.trim().isEmpty()) {
        println("no Text extracted", inputPath)
      } else {
        println("Success")
      }
      val fname = "ABC"
      val configuration: Configuration = HBaseConfiguration.create()
      configuration.set("hbase.zookeeper.quorum", "XXXX")
      configuration.set("hbase.zookeeper.property.clientPort", "XXXX")
      configuration.set("zookeeper.znode.parent", "/hbase-XXX")
      configuration.set("hbase.client.keyvalue.maxsize", "0")
      val principal = System.getProperty("kerberosPrincipal", "XXXXX")
      val keytabLocation = System.getProperty("kerberosKeytab", "XXXXXXXXX")
      UserGroupInformation.setConfiguration(configuration)
      UserGroupInformation.loginUserFromKeytab(principal, keytabLocation)
      val admin = connection.getAdmin
      val hTable: HTable = new HTable(configuration, "XXXXXXXXX")
      val g = new Put(Bytes.toBytes(fname))
      g.add(Bytes.toBytes("txt"), Bytes.toBytes("text"), Bytes.toBytes(hand))
      hTable.put(g)
      val m = new Put(Bytes.toBytes(fname))
      m.add(Bytes.toBytes("data"), Bytes.toBytes("info"), Bytes.toBytes(jsonString))
      hTable.put(m)
      hTable.close()
      fs.close()
    }
    catch {
      case e: Throwable => {
        println(e.printStackTrace)
      }
    }
  }
}

object App {
  def main(args : Array[String]) {
    val fnames = "/X/X/XXXXX.XXX"
    fnames.foreach{x => Hbase.extractInfo(x) }
  }
}

相关问题