我有一个databricksKafka制作人,需要为Kafka主题写6200万张唱片。如果我同时写6200万条记录会有问题吗?或者我需要迭代20次,每次迭代写3百万条记录。
这是密码。
Cmd1 val srcDf = spark.read.format("delta").load("/mnt/data-lake/data/silver/geocodes").filter($"LastUpdateDt"===lastUpdateDt)
Cmd2 val strDf = srcDf
.withColumn("key",...
.withColumn("topLevelRecord",...
Cmd3 strDf
.select(
to_avro($"key", lit("topic-AVRO-key"), schemaRegistryAddr).as("key"),
to_avro($"topLevelRecord", lit("topic-AVRO-value"), schemaRegistryAddr, avroSchema).as("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.keystore.location", kafkaKeystoreLocation)
.option("kafka.ssl.keystore.password", keystorePassword)
.option("kafka.ssl.truststore.location", kafkaTruststoreLocation)
.option("topic",topic)
.save()
我的问题是-如果strdf.count是62m,我可以直接写给kafka还是需要迭代cmd#3。
1条答案
按热度按时间ltskdhd11#
使用spark structured streaming for kafka将数据存储到kafka中没有限制。您将在下面看到,您的流式查询将创建一个(池)
KafkaProducer
用于遍历Dataframe
. Kafka可以处理如此多的信息,而且没有限制。值得注意的是,kafka将在将这批消息实际写入代理之前将一些消息缓冲到一个批中。这是通过kafkaproducer配置的配置来引导的
linger.ms
,batch.size
以及max.request.size
,因此根据您的总体设置调整这些设置可能很有用。以下是spark kafka sql库的代码:
在内部,spark将在internalkafkaproducerpool.scala中创建一个kafkaproducer池:
然后将查询转换为rdd,并对每个分区遍历kafkawriter.scala中的元素:
数据的实际生成将发生在kafkawritetask中: