spark结构化流多线程/多用户

brccelvz  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(307)

我正在开发一个spark流应用程序,使用spark结构化流、合流开源kafka集群,并在aws emr中运行spark作业。我们至少有20多个Kafka主题,以avro格式将数据生成单个Kafka主题,每个主题在3到4个分区之间进行了分区。我正在使用spark阅读所有20多个主题(逗号分隔的主题值) ReadStream . 然后从产生的Dataframe中过滤每个消息行,使用正确的avro模式应用每个消息并写入结果 Dataset[T] 给s3和Cassandra。
一些我找不到答案的问题
我可以用一个吗 ReadStream 所有的主题?它是否会被认为是所有主题和分区的一个spark使用者,因为我在emr中只执行一个“spark submit job”?
spark应用程序如何跨分区分配处理?spark是使用不同的执行器并行读取这些主题/分区,还是需要为每个分区实现多线程?
是否有可能在一个消费群体中扩展到多个消费群体以实现并行化?
抱歉问了这么多问题,我想它们都是相关的。感谢您的任何反馈或指点,我可以找到文件。
myconfig公司

val kafkaParams=  Map("kafka.bootstrap.servers" -> "topic1,topic2,topic3,topic4,topic5,
    "failOnDataLoss" -> param.fail_on_data_loss.toString,
    "subscribe" -> param.topics.toString,
    "startingOffsets" -> param.starting_offsets.toString,
    "kafka.security.protocol" -> param.kafka_security_protocol.toString,
    "kafka.ssl.truststore.location" -> param.kafka_ssl_truststore_location.toString,
    "kafka.ssl.truststore.password" -> param.kafka_ssl_truststore_password.toString
  )

读取流代码

val df = sparkSession.readStream
  .format("kafka")
  .options(kafkaParams)
  .load()

然后使用“topic”列将输入dataframe拆分为多个Dataframe,并为每个结果dataframe应用avro模式。
正在写入每个 Dataset[T] 像s3,cassandra等不同的Flume。。。

dy1byipe

dy1byipe1#

我可以为所有主题使用一个readstream吗?
假设所有主题都可以使用同一组Kafka配置,那么就确定了。不过,它可能不具备容错能力。例如, failOnDataLoss 将导致整个作业在单个主题失败时失败。
它会被认为是一个Spark消费者的所有主题和分区。。。spark是否使用不同的执行器并行读取这些主题/分区?
对。您可以将spark执行器的数量扩展到所有主题的分区总数。
我需要为每个分区实现多线程吗?
spark应该帮你处理。
是否有可能在一个消费群体中扩展到多个消费群体以实现并行化?
你应该试着设置一个 group.id 属性,但有多个执行者已经在创建使用者组。
与问题无关-你正在尝试做的已经是Kafka连接的目的。将Kafka数据读入各种数据源。s3和cassandra已经是两个现有的插件实现了。

相关问题