我有一个在dataproc中运行的spark作业,我想将结果加载到bigquery,我知道我必须添加spark bigquery连接器来将数据保存到bigquery
name := "spl_prj"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
conflictManager := ConflictManager.latestRevision
libraryDependencies ++= Seq(
"org.apache.spark" %%"spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided ,
"com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.17.3"
)
当我构建jar并提交作业时,会出现以下错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at com.renault.datalake.spl_prj.Main$.main(Main.scala:58)
at com.renault.datalake.spl_prj.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
原因:java.lang.classnotfoundexception:bigquery.defaultsource
我没有权限在提交作业时添加jar,就像在本例中一样,我认为当sbt构建jar时,没有在编译过程中添加连接器,snippest代码scala spark我想运行:
val spark = SparkSession.builder.config(conf).getOrCreate()
val bucket = "doc_spk"
spark.conf.set("temporaryGcsBucket", bucket)
val sc =spark.sparkContext
val rddRowString = sc.binaryRecords("gs://bucket/GAR", 120).map(x=>(x.slice(0,17),x.slice(17,20),x.slice(20,120)))
val df=spark.createDataFrame(rddRowString).toDF("v","data","val_data")
df.write.format("bigquery")
.option("table","db.table")
.save()
1条答案
按热度按时间xzlaal3s1#
在下面使用
buil.sbt
建筑用文件fat
jar文件。构建.sbt
创建
project/plugins.sbt
文件和添加以下内容。运行下面的命令创建“fat”jar。
注意:您可以根据您的项目要求调整版本。