我收到Kafka以
{"email":"test@example","firstname":"Example","lastname":"User"}
我想访问电子邮件id和名字,并将其与来自cassandra的数据进行比较,格式如下:
CassandraRow{email: abc@xyz.com}
i7uaboj41#
您需要使用 joinWithCassandraTable 函数。。。为了更有效,您可能需要重新分区您从kafka获得的rdd,以匹配cassandra表中的分区。代码可以如下所示:
joinWithCassandraTable
val resultRdd = kafkaRDD.repartitionByCassandraReplica("ks","emails") .joinWithCassandraTable("ks","emails")
之后,你可以分析,如果名称匹配等,加入后,你应该得到只有记录,其中有电子邮件在Cassandra。。。
1条答案
按热度按时间i7uaboj41#
您需要使用
joinWithCassandraTable
函数。。。为了更有效,您可能需要重新分区您从kafka获得的rdd,以匹配cassandra表中的分区。代码可以如下所示:
之后,你可以分析,如果名称匹配等,加入后,你应该得到只有记录,其中有电子邮件在Cassandra。。。