在单个product命令中可以生成kafka主题的记录数量有限制吗

rvpgvaaj  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(509)

我有一个databricksKafka制作人,需要为Kafka主题写6200万张唱片。如果我同时写6200万条记录会有问题吗?或者我需要迭代20次,每次迭代写3百万条记录。
这是密码。

Cmd1 val srcDf = spark.read.format("delta").load("/mnt/data-lake/data/silver/geocodes").filter($"LastUpdateDt"===lastUpdateDt)

Cmd2 val strDf = srcDf
        .withColumn("key",...
        .withColumn("topLevelRecord",...

Cmd3 strDf
 .select(
 to_avro($"key", lit("topic-AVRO-key"), schemaRegistryAddr).as("key"),
 to_avro($"topLevelRecord", lit("topic-AVRO-value"), schemaRegistryAddr, avroSchema).as("value"))
 .write
 .format("kafka")
 .option("kafka.bootstrap.servers", bootstrapServers)
 .option("kafka.security.protocol", "SSL")
 .option("kafka.ssl.keystore.location", kafkaKeystoreLocation)
 .option("kafka.ssl.keystore.password", keystorePassword)
 .option("kafka.ssl.truststore.location", kafkaTruststoreLocation)
 .option("topic",topic)
 .save()

我的问题是-如果strdf.count是62m,我可以直接写给kafka还是需要迭代cmd#3。

ltskdhd1

ltskdhd11#

使用spark structured streaming for kafka将数据存储到kafka中没有限制。您将在下面看到,您的流式查询将创建一个(池) KafkaProducer 用于遍历 Dataframe . Kafka可以处理如此多的信息,而且没有限制。
值得注意的是,kafka将在将这批消息实际写入代理之前将一些消息缓冲到一个批中。这是通过kafkaproducer配置的配置来引导的 linger.ms , batch.size 以及 max.request.size ,因此根据您的总体设置调整这些设置可能很有用。

以下是spark kafka sql库的代码:

在内部,spark将在internalkafkaproducerpool.scala中创建一个kafkaproducer池:

private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
    val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
    if (log.isDebugEnabled()) {
      val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
      logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
    }
    kafkaProducer
  }

然后将查询转换为rdd,并对每个分区遍历kafkawriter.scala中的元素:

queryExecution.toRdd.foreachPartition { iter =>
      val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
      Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
        finallyBlock = writeTask.close())
    }
  }

数据的实际生成将发生在kafkawritetask中:

def execute(iterator: Iterator[InternalRow]): Unit = {
    producer = Some(InternalKafkaProducerPool.acquire(producerConfiguration))
    val internalProducer = producer.get.producer
    while (iterator.hasNext && failedWrite == null) {
      val currentRow = iterator.next()
      sendRow(currentRow, internalProducer)
    }
  }

相关问题