我正在开发一个spark流应用程序,使用spark结构化流、合流开源kafka集群,并在aws emr中运行spark作业。我们至少有20多个Kafka主题,以avro格式将数据生成单个Kafka主题,每个主题在3到4个分区之间进行了分区。我正在使用spark阅读所有20多个主题(逗号分隔的主题值) ReadStream
. 然后从产生的Dataframe中过滤每个消息行,使用正确的avro模式应用每个消息并写入结果 Dataset[T]
给s3和Cassandra。
一些我找不到答案的问题
我可以用一个吗 ReadStream
所有的主题?它是否会被认为是所有主题和分区的一个spark使用者,因为我在emr中只执行一个“spark submit job”?
spark应用程序如何跨分区分配处理?spark是使用不同的执行器并行读取这些主题/分区,还是需要为每个分区实现多线程?
是否有可能在一个消费群体中扩展到多个消费群体以实现并行化?
抱歉问了这么多问题,我想它们都是相关的。感谢您的任何反馈或指点,我可以找到文件。
myconfig公司
val kafkaParams= Map("kafka.bootstrap.servers" -> "topic1,topic2,topic3,topic4,topic5,
"failOnDataLoss" -> param.fail_on_data_loss.toString,
"subscribe" -> param.topics.toString,
"startingOffsets" -> param.starting_offsets.toString,
"kafka.security.protocol" -> param.kafka_security_protocol.toString,
"kafka.ssl.truststore.location" -> param.kafka_ssl_truststore_location.toString,
"kafka.ssl.truststore.password" -> param.kafka_ssl_truststore_password.toString
)
读取流代码
val df = sparkSession.readStream
.format("kafka")
.options(kafkaParams)
.load()
然后使用“topic”列将输入dataframe拆分为多个Dataframe,并为每个结果dataframe应用avro模式。
正在写入每个 Dataset[T]
像s3,cassandra等不同的Flume。。。
1条答案
按热度按时间dy1byipe1#
我可以为所有主题使用一个readstream吗?
假设所有主题都可以使用同一组Kafka配置,那么就确定了。不过,它可能不具备容错能力。例如,
failOnDataLoss
将导致整个作业在单个主题失败时失败。它会被认为是一个Spark消费者的所有主题和分区。。。spark是否使用不同的执行器并行读取这些主题/分区?
对。您可以将spark执行器的数量扩展到所有主题的分区总数。
我需要为每个分区实现多线程吗?
spark应该帮你处理。
是否有可能在一个消费群体中扩展到多个消费群体以实现并行化?
你应该试着设置一个
group.id
属性,但有多个执行者已经在创建使用者组。与问题无关-你正在尝试做的已经是Kafka连接的目的。将Kafka数据读入各种数据源。s3和cassandra已经是两个现有的插件实现了。