使用phoenix和spark将数据从hbase导出到csv时,会导致数据丢失。我的hbase表中有2200万行,当我将它导出到csv时,只有1900万行。少了300万行。
在写入csv之前,我尝试过缓存Dataframe,但结果仍然是1900万行。我已经使用合并,因为我需要在一个csv文件的一切。
我也试过用!Phoenix城的记录,但这里的问题是,数据是巨大的,它需要永远加载。
!输出格式csv
!记录数据.csv
从表中选择*;
!记录
!退出
如果有任何方法,我可以导出我的hbase表而不丢失任何数据?或者有人可以帮我编辑代码,或者任何建议都会很有帮助。
我在scala中的spark代码:
import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.SparkSession
object ExportCSV {
def main(args: Array[String]): Unit = {
val table_name = "xyz"
val phoenix_zk = "jdbc:phoenix:zkurl"
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val spark = SparkSession.builder().master("local[*]")
.config("spark.debug.maxToStringFields", "100")
//.config("spark.driver.maxResultSize", "2g")
.config("spark.yarn.executor.memoryOverhead", "4g")
.appName("SparkPhoenix")
.getOrCreate()
val df = spark.read.format("jdbc").options(
Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver",
"url" -> phoenix_zk,
"dbtable" -> xyz)).load()
print(df.count()) //22 million rows in dataframe
df.cache()
print(df.count()) //19 million rows after cache
df.explain(extended = true)
df.coalesce(1).write.mode("append").option("header", "true").csv("/tchiring/export_test")
}
}
暂无答案!
目前还没有任何答案,快来回答吧!