在java中使用spark bigquery连接器时遇到问题

r7xajy2e  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(737)

我可以通过spark big query connector从本地读取bigquery表中的数据,但是当我在google cloud中部署它并通过dataproc运行时,我得到了下面的异常。如果您看到下面的日志,它可以识别表的架构,然后等待8-10分钟并抛出下面的异常。有人能帮忙吗?

20/10/30 13:15:40 INFO org.spark_project.jetty.util.log: Logging initialized @2859ms
20/10/30 13:15:40 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
20/10/30 13:15:40 INFO org.spark_project.jetty.server.Server: Started @2959ms
20/10/30 13:15:40 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector
20/10/30 13:15:40 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
20/10/30 13:15:41 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at <REMOVED_RM_INFO_FOR_SECURITY_PURPOSE>
20/10/30 13:15:41 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at <REMOVED_HISTORY_SERVER_INFO_FOR_SECURITY_PURPOSE>
20/10/30 13:15:44 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1603913904708_0011
20/10/30 13:15:50 INFO com.google.cloud.spark.bigquery.BigQueryUtilScala: BigQuery client project id is [<REMOVED_PROJECT_ID_FOR_SECURITY_PURPOSE>}], derived from the parentProject option
20/10/30 13:15:52 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Querying table <REMOVED_TABLE_NAME_FOR_SECURITY_PURPOSE>, parameters sent from Spark: requiredColumns=[country,ssn,fname,postal_code,lname,city,tenant_id,mob,PARTY_ID,src_id], filters=[]
20/10/30 13:15:52 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Going to read from <REMOVED_TABLE_NAME_FOR_SECURITY_PURPOSE> columns=[country, ssn, fname, postal_code, lname, city, tenant_id, mob, PARTY_ID, src_id], filter=''
Exception in thread "main" com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
    at shaded.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074)
    at shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
    at shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213)
    at shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
    at shaded.com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
    at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
        at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
        at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
        at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.createReadSession(BigQueryReadClient.java:230)
        at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.scala:134)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$10.apply(DataSourceStrategy.scala:293)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:338)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:337)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:415)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:333)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3260)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2495)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2709)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:731)
        at com.gcp.poc.SparkBigQueryConnector.main(SparkBigQueryConnector.java:33)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
    at io.grpc.Status.asRuntimeException(Status.java:533)
    ... 16 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: null: bigquerystorage.googleapis.com
    at io.netty.channel.unix.Errors.throwConnectException(Errors.java:102)
    at io.netty.channel.unix.Socket.connect(Socket.java:255)
    at io.netty.channel.epoll.AbstractEpollChannel.doConnect0(AbstractEpollChannel.java:758)
    at io.netty.channel.epoll.AbstractEpollChannel.doConnect(AbstractEpollChannel.java:743)
    at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.connect(AbstractEpollChannel.java:585)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
    at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
    at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
    at io.grpc.netty.WriteBufferingAndExceptionHandler.connect(WriteBufferingAndExceptionHandler.java:150)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.access$1000(AbstractChannelHandlerContext.java:38)
    at io.netty.channel.AbstractChannelHandlerContext$11.run(AbstractChannelHandlerContext.java:535)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    ... 1 more
Caused by: java.net.NoRouteToHostException
    ... 22 more
20/10/30 13:25:35 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark
Job output is complete

pom.xml文件:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spark.version>2.3.4</spark.version>
        <scala.version>2.11.8</scala.version>
    </properties>

    <distributionManagement>
        <snapshotRepository>
            <id>sonatype-nexus-snapshots</id>
            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
        </snapshotRepository>
        <repository>
            <id>sonatype-nexus-staging</id>
            <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
        </repository>
    </distributionManagement>
    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-shared-dependencies</artifactId>
            <version>0.13.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquery</artifactId>
            <version>1.116.10</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-jdk5</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>failureaccess</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>listenablefuture</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.spark</groupId>
            <artifactId>spark-bigquery_2.11</artifactId>
            <version>0.17.3</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-jdk5</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>failureaccess</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>listenablefuture</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.0-jre</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <!-- Maven Shade Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <finalName>${project.artifactId}-${project.version}</finalName>
                            <relocations>
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>shaded.com.google.common</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.protobuf</pattern>
                                    <shadedPattern>shaded.com.google.protobuf</shadedPattern>
                                </relocation>
                            </relocations>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>0.8.5</version>
                <executions>
                    <execution>
                        <id>prepare-agent</id>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                        <configuration>
                            <destFile>target/ut-coverage.exec</destFile>
                        </configuration>
                    </execution>
                    <execution>
                        <id>report</id>
                        <phase>verify</phase>
                        <goals>
                            <goal>report</goal>
                        </goals>
                        <configuration>
                            <dataFile>target/ut-coverage.exec</dataFile>
                            <outputDirectory>target/jacoco-ut</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

示例代码:

public static SparkSession getSparkSession() {
        return SparkSession.builder()
                //.master("local[*]")
                .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-latest.jar")
                .getOrCreate();
    }

    public static void main(String[] args) {
        SparkSession session = getSparkSession();
        Dataset<Row> readDS = session.read().format("bigquery")
                .option("table", "<PROJECT_ID.DATASET.TABLENAME>")
                .option("project", projectId)
                .option("parentProject", projectId)
                .load();
        readDS.show(1,false);
    }
izj3ouym

izj3ouym1#

对于其他人,
下面是我使用的大查询依赖项,它现在可以正常工作了。

<dependency>
    <groupId>com.google.cloud.spark</groupId>
    <artifactId>spark-bigquery-with-dependencies_2.12</artifactId>
    <version>0.17.3</version>
    <exclusions>
          <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava-jdk5</artifactId>
          </exclusion>
          <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
          </exclusion>
          <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>failureaccess</artifactId>
          </exclusion>
          <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>listenablefuture</artifactId>
          </exclusion>
    </exclusions>
</dependency>

相关问题