我使用的是spark-sql-2.4.1、spark-cassandra-connector 2.11-2.4.1以及java8和apache cassandra 3.0版本。
我有我的Spark提交或Spark集群环境如下加载20亿记录。
--executor-cores 3
--executor-memory 9g
--num-executors 5
--driver-cores 2
--driver-memory 4g
我使用的cassandra 6节点群集具有以下设置:
cassandra.output.consistency.level=ANY
cassandra.concurrent.writes=1500
cassandra.output.batch.size.bytes=2056
cassandra.output.batch.grouping.key=partition
cassandra.output.batch.grouping.buffer.size=3000
cassandra.output.throughput_mb_per_sec=128
cassandra.connection.keep_alive_ms=30000
cassandra.read.timeout_ms=600000
我正在使用sparkDataframe加载到cassandra表中。在读入spark数据集后,我按以下特定列进行分组。
Dataset<Row> dataDf = //read data from source i.e. hdfs file which are already partitioned based "load_date", "fiscal_year" , "fiscal_quarter" , "id", "type","type_code"
Dataset<Row> groupedDf = dataDf.groupBy("id","type","value" ,"load_date","fiscal_year","fiscal_quarter" , "create_user_txt", "create_date")
groupedDf.write().format("org.apache.spark.sql.cassandra")
.option("table","product")
.option("keyspace", "dataload")
.mode(SaveMode.Append)
.save();
Cassandra table(
PRIMARY KEY (( id, type, value, item_code ), load_date)
) WITH CLUSTERING ORDER BY ( load_date DESC )
基本上我是按“id”、“type”、“value”、“load\u date”列分组的。由于其他列(“财政年度”、“财政季度”、“创建用户文本”、“创建日期”)应可用于存储到cassandra表中,因此我必须将它们也包括在groupby子句中。
1) 坦率地说,我不知道如何将groupby之后的那些列放入结果Dataframe,即groupeddf来存储。有什么建议吗?
2) 在上面的过程/步骤中,我的spark加载工作非常慢,因为有很多无序处理,比如读无序处理和写无序处理。
我应该怎么做来提高速度?
在从源代码(到datadf)读取数据时,我需要在这里做些什么来提高性能吗?这已经分区了。
我还需要做分区吗?如果是这样的话,根据上面的Cassandra表,最好的方法是什么?
hdfs文件列
“id”,“类型”,“值”,“类型代码”,“加载日期”,“项目代码”,“会计年度”,“会计季度”,“创建日期”,“上次更新日期”,“创建用户文本”,“更新用户文本”
旋转
我使用groupby是因为如下所示的旋转
Dataset<Row> pivot_model_vals_unpersist_df = model_vals_df.groupBy("id","type","value","type_code","load_date","item_code","fiscal_year","fiscal_quarter","create_date")
.pivot("type_code" )
.agg( first(//business logic)
)
)
请给我建议。非常感谢您的建议/反馈。
1条答案
按热度按时间mqkwyuun1#
所以,正如我从评论中得到的,你的下一个任务是:
从hdfs中取出2b行。
通过一些转换将这些行保存到cassandra中。
cassandra表的模式与hdfs数据集的模式不同。
一开始,你肯定不需要分组。groupby不对列进行分组,它对行进行分组,调用sum、avg、max等聚合函数。语义类似于sql“groupby”,所以这不是你的情况。您真正需要的是-使您的“to save”数据集适合所需的cassandra模式。
在java中,这比在scala中要复杂一些。首先,我建议定义一个bean来表示cassandra行。
您的数据集是dataset,您需要将其转换为javardd。所以,你需要一个转换器。
结果我们会有这样的结果:
更多信息,你可以看看https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md