spark cassandra连接器+连接超时

gfttwv5a  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(662)

我需要连接两个sparkDataframe,并将结果返回到hive。以下是Dataframe:
dataframe1:cassandra表-分区和集群键:(id,part\u nbr)

val df1 = spark.read.format("org.apache.spark.sql.cassandra")
    .option("keyspace", "mykeyspace")
    .option("table", "mytable")
    .load

dataframe2:从另一个源获取的键(在上表中是id列的分区键)的dataframe-此表中的不同键的数量约为15万

val df2 = spark.read
    .format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("url","****")
    .option("dbtable","table")
    .option("user", "username")
    .option("password", "password123")
    .load()

  val joinExpr = df1.col("ID") === df2.col("ID")

  val res = df1.join(df2,joinExpr)

  res.write.mode(SaveMode.Append).format("orc")
    .saveAsTable("targetTable")

现在这个代码总是导致“com.datastax.oss.driver.api.core.servererrors.readfailureexception:cassandra failure during read query at consistency local\ u one(需要1个响应,但只有0个副本响应,1个失败)”。
即使失败,也将本地\u one更改为quorum。
我甚至尝试过将keys dataframe拆分成20个键(一个dataframe中有20个id值)的批,然后与cassandra表连接——即使失败了。
我甚至尝试过in子句,尽管它很有效,但dba限制了我们运行它,因为它会加载cassandra并导致cpu峰值。
在与cassandra dba进行检查时,他们要求执行有针对性的查询,因为上面的查询会导致较大的令牌范围扫描,从而导致失败。但单独的定点查询将导致15万次往返Cassandra(这需要几个小时才能完成),而且成本太高。
为什么这会导致如此巨大的令牌范围扫描?我们怎样才能解决这个问题?我的选择是什么?
pom.xml依赖关系

<scala.version>2.11.12</scala.version>
 <spark.version>2.2.0</spark.version>

        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.5.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

尝试了以下内容,但它不做直接连接。我还缺什么?

spark-submit --class ExampleCassandra --deploy-mode client --num-executors 15 --executor-memory 4g  --driver-memory=1g  --conf spark.sql.shuffle.partitions=25 --conf spark.executor.heartbeatInterval=100s --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --jars spark-sql_2.11-2.4.0.jar,spark-core_2.11-2.4.0.jar,spark-hive_2.11-2.4.0.jar,mysql-connector-java-8.0.18.jar,spark-cassandra-connector_2.11-2.5.1.jar ExampleCassandra-bundled-1.0-SNAPSHOT.jar

代码中打印的spark版本=>spark.sparkcontext.version=2.4.0
最终的计划

== Physical Plan ==

* (8) SortMergeJoin [item_nbr#31], [item_nbr#24], Inner

:- *(2) Sort [item_nbr#31 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(item_nbr#31, 25)
:     +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [item_nbr#31,planNum#32,strN#33,currTail#34,currTailTy#35,hor#36,prNbr#37,revSce#38,stckHnad#39] PushedFilters: [], ReadSchema: struct<item_nbr:int,planNum:int,strN:int,currTail:decimal(38,18),currTailTy:s...
+- *(7) Sort [item_nbr#24 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(item_nbr#24, 25)
      +- *(6) HashAggregate(keys=[item_nbr#21], functions=[])
         +- Exchange hashpartitioning(item_nbr#21, 25)
            +- *(5) HashAggregate(keys=[item_nbr#21], functions=[])
               +- *(5) Filter (NOT (trim(lower(item_nbr#21), None) = null) && isnotnull(cast(trim(item_nbr#21, None) as int)))
                  +- Generate explode(split(items#4, ,)), false, [item_nbr#21]
                     +- *(4) Project [items#4]
                        +- *(4) BroadcastHashJoin [planNum#0], [planNum#2], Inner, BuildRight
                           :- *(4) Scan JDBCRelation(( select planNum from QAMdPlans.Plan where plan_type = 'MBM' order by planNum desc ) t) [numPartitions=1] [planNum#0] PushedFilters: [*IsNotNull(planNum)], ReadSchema: struct<planNum:int>
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                              +- *(3) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [planNum#2,items#4] PushedFilters: [], ReadSchema: struct<planNum:int,items:string>
rkkpypqq

rkkpypqq1#

问题是版本2.0.5没有优化Dataframe的连接-如果你愿意的话 res.explain 您将看到spark将执行从cassandra读取所有数据,然后在spark级别执行join。优化后的连接仅在RDDAPI中可用 leftJoinWithCassandraTable 或者 joinWithCassandraTable .
随着spark cassandra connector 2.5的发布,这种情况已经发生了变化,它现在包含了针对dataframe api的优化连接(但您需要启用spark sql扩展才能使其工作)。因此,您需要将连接器升级到2.5.5最新版本(目前为2.5.1),或者使用RDDAPI中的连接功能。
p、 我最近写了一篇详细的博客文章,介绍了如何使用dataframe和rddapi有效地连接来自spark的cassandra表中的数据。

相关问题