使用spark将hbase表转储到csv会导致数据丢失

wgeznvg7  于 2021-06-09  发布在  Hbase
关注(0)|答案(0)|浏览(312)

使用phoenix和spark将数据从hbase导出到csv时,会导致数据丢失。我的hbase表中有2200万行,当我将它导出到csv时,只有1900万行。少了300万行。
在写入csv之前,我尝试过缓存Dataframe,但结果仍然是1900万行。我已经使用合并,因为我需要在一个csv文件的一切。
我也试过用!Phoenix城的记录,但这里的问题是,数据是巨大的,它需要永远加载。
!输出格式csv
!记录数据.csv
从表中选择*;
!记录
!退出
如果有任何方法,我可以导出我的hbase表而不丢失任何数据?或者有人可以帮我编辑代码,或者任何建议都会很有帮助。
我在scala中的spark代码:

import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.SparkSession

object ExportCSV {
  def main(args: Array[String]): Unit = {

    val table_name = "xyz"
    val phoenix_zk = "jdbc:phoenix:zkurl"

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    val spark = SparkSession.builder().master("local[*]")
      .config("spark.debug.maxToStringFields", "100")
      //.config("spark.driver.maxResultSize", "2g")
      .config("spark.yarn.executor.memoryOverhead", "4g")
      .appName("SparkPhoenix")
      .getOrCreate()

    val df = spark.read.format("jdbc").options(
    Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver",
    "url" -> phoenix_zk,
    "dbtable" -> xyz)).load()

    print(df.count())  //22 million rows in dataframe
    df.cache()
    print(df.count())  //19 million rows after cache

    df.explain(extended = true)

    df.coalesce(1).write.mode("append").option("header", "true").csv("/tchiring/export_test")

  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题