我可以通过spark big query connector从本地读取bigquery表中的数据,但是当我在google cloud中部署它并通过dataproc运行时,我得到了下面的异常。如果您看到下面的日志,它可以识别表的架构,然后等待8-10分钟并抛出下面的异常。有人能帮忙吗?
20/10/30 13:15:40 INFO org.spark_project.jetty.util.log: Logging initialized @2859ms
20/10/30 13:15:40 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
20/10/30 13:15:40 INFO org.spark_project.jetty.server.Server: Started @2959ms
20/10/30 13:15:40 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector
20/10/30 13:15:40 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
20/10/30 13:15:41 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at <REMOVED_RM_INFO_FOR_SECURITY_PURPOSE>
20/10/30 13:15:41 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at <REMOVED_HISTORY_SERVER_INFO_FOR_SECURITY_PURPOSE>
20/10/30 13:15:44 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1603913904708_0011
20/10/30 13:15:50 INFO com.google.cloud.spark.bigquery.BigQueryUtilScala: BigQuery client project id is [<REMOVED_PROJECT_ID_FOR_SECURITY_PURPOSE>}], derived from the parentProject option
20/10/30 13:15:52 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Querying table <REMOVED_TABLE_NAME_FOR_SECURITY_PURPOSE>, parameters sent from Spark: requiredColumns=[country,ssn,fname,postal_code,lname,city,tenant_id,mob,PARTY_ID,src_id], filters=[]
20/10/30 13:15:52 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Going to read from <REMOVED_TABLE_NAME_FOR_SECURITY_PURPOSE> columns=[country, ssn, fname, postal_code, lname, city, tenant_id, mob, PARTY_ID, src_id], filter=''
Exception in thread "main" com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at shaded.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074)
at shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213)
at shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
at shaded.com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.createReadSession(BigQueryReadClient.java:230)
at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.scala:134)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:338)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:337)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:415)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:333)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2495)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2709)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at org.apache.spark.sql.Dataset.show(Dataset.scala:731)
at com.gcp.poc.SparkBigQueryConnector.main(SparkBigQueryConnector.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.Status.asRuntimeException(Status.java:533)
... 16 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: null: bigquerystorage.googleapis.com
at io.netty.channel.unix.Errors.throwConnectException(Errors.java:102)
at io.netty.channel.unix.Socket.connect(Socket.java:255)
at io.netty.channel.epoll.AbstractEpollChannel.doConnect0(AbstractEpollChannel.java:758)
at io.netty.channel.epoll.AbstractEpollChannel.doConnect(AbstractEpollChannel.java:743)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.connect(AbstractEpollChannel.java:585)
at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
at io.grpc.netty.WriteBufferingAndExceptionHandler.connect(WriteBufferingAndExceptionHandler.java:150)
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
at io.netty.channel.AbstractChannelHandlerContext.access$1000(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$11.run(AbstractChannelHandlerContext.java:535)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
... 1 more
Caused by: java.net.NoRouteToHostException
... 22 more
20/10/30 13:25:35 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark
Job output is complete
pom.xml文件:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.3.4</spark.version>
<scala.version>2.11.8</scala.version>
</properties>
<distributionManagement>
<snapshotRepository>
<id>sonatype-nexus-snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>sonatype-nexus-staging</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-shared-dependencies</artifactId>
<version>0.13.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>1.116.10</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>listenablefuture</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud.spark</groupId>
<artifactId>spark-bigquery_2.11</artifactId>
<version>0.17.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>listenablefuture</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Maven Shade Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>${project.artifactId}-${project.version}</finalName>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shaded.com.google.common</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.5</version>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>target/ut-coverage.exec</destFile>
</configuration>
</execution>
<execution>
<id>report</id>
<phase>verify</phase>
<goals>
<goal>report</goal>
</goals>
<configuration>
<dataFile>target/ut-coverage.exec</dataFile>
<outputDirectory>target/jacoco-ut</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
示例代码:
public static SparkSession getSparkSession() {
return SparkSession.builder()
//.master("local[*]")
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-latest.jar")
.getOrCreate();
}
public static void main(String[] args) {
SparkSession session = getSparkSession();
Dataset<Row> readDS = session.read().format("bigquery")
.option("table", "<PROJECT_ID.DATASET.TABLENAME>")
.option("project", projectId)
.option("parentProject", projectId)
.load();
readDS.show(1,false);
}
1条答案
按热度按时间izj3ouym1#
对于其他人,
下面是我使用的大查询依赖项,它现在可以正常工作了。