我有一个扫描表,其中包含1亿生产记录的要求。搜索将在第一个聚类键上进行。其要求是在第一个簇密钥与一个条件匹配的情况下找到唯一的分区密钥。该表如下所示-
employeeid,companyname,lastdateloggedin,FloorListed,swipetimestamp
分区密钥-employeeid群集密钥-公司名称,lastdateloggedin
我想获得select distinct(employeeid),company,swipitemestamp,其中companyname='xyz'。这是我想从表中获取的内容的sql表示。
SparkConf conf = new SparkConf().set("spark.cassandra.connection.enabled", "true")
.set("spark.cassandra.auth.username", "XXXXXXXXXX")
.set("spark.cassandra.auth.password", "XXXXXXXXX")
.set("spark.cassandra.connection.host", "hostname")
.set("spark.cassandra.connection.port", "29042")
.set("spark.cassandra.connection.factory", ConnectionFactory.class)
.set("spark.cassandra.connection.cluster_name", "ZZZZ")
.set("spark.cassandra.connection.application_name", "ABC")
.set("spark.cassandra.connection.local_dc", "DC1")
.set("spark.cassandra.connection.cachedClusterFile", "/tmp/xyz/test.json")
.set("spark.cassandra.connection.ssl.enabled", "true")
.set("spark.cassandra.input.fetch.size_in_rows","10000") //
.set("spark.driver.allowMultipleContexts","true")
.set("spark.cassandra.connection.ssl.trustStore.path", "sampleabc-spark-util/src/main/resources/x.jks")
.set("spark.cassandra.connection.ssl.trustStore.password", "cassandrasam");
CassandraJavaRDD<CassandraRow> ctable = javaFunctions(jsc).cassandraTable("keyspacename", "employeedetails").
select("employeeid", "companyname","swipetimestamp").where("companyname= ?","XYZ");
List<CassandraRow> cassandraRows = ctable.distinct().collect();
这段代码在非生产环境下运行,有近500万个数据。由于这是生产,我想谨慎处理这个问题。问题-
我的sparkconf中应该有什么配置?
spark的工作会不会因为table太大而降低db?
运行该作业可能会使cassandra在那一刻缺少线程?
1条答案
按热度按时间uurity8g1#
我建议使用DataFrameAPI而不是RDD—理论上,scc可能会对该api进行更多优化。如果您在第一个集群列上有条件,那么这个条件应该由scc下推到cassandra并在那里进行过滤。你可以用
.expalin
并检查是否有标记为的规则*
在PushedFilters
部分。关于配置-使用
spark.cassandra.input.fetch.size_in_rows
-如果你有太高的价值,那么你可以有更高的机会获得超时。您仍然可以使用默认值关闭节点,因为scc正在使用读取LOCAL_ONE
,从而使单个节点过载。有时候,和我一起读书LOCAL_QUORUM
速度更快,因为它不会使单个节点过载过多,也不会重新启动正在读取数据的任务。我建议你一定要使用最新的spark cassandra连接器2.5.0它有很多新的优化和新功能。。。