spark流媒体+kafka集成:支持新的主题订阅,而不需要重新启动流媒体上下文

nfs0ujit  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(325)

我正在使用spark流应用程序(spark 2.1)使用kafka(0.10.1)主题中的数据。我想订阅新主题而不重新启动流上下文。有没有办法做到这一点?
我可以在apachespark项目中看到一张jira的票(https://issues.apache.org/jira/browse/spark-10320),即使它在2.0版本中是关闭的,我也找不到任何文档或示例来实现这一点。如果你们中有人对此很熟悉,请给我提供相同的文档链接或示例。提前谢谢。

yzckvree

yzckvree1#

我发现这个解决方案更适合我的目的。我们可以用不同的数据流共享一个streamingcontext示例。为了更好地管理,我们可以使用相同的流上下文为每个主题创建单独的“dstream”示例,您可以将此“dstream”示例与其主题名称一起存储在Map中,以便以后您可以停止或取消订阅该特定主题。为了清楚起见,请参阅下面的代码。

<script src="https://gist.github.com/shemeemsp7/01d21588347b94204c71a14005be8afa.js"></script>
o3imoua4

o3imoua42#

spark 2.0.x和kafka 0.10.x之间的集成支持订阅模式。根据文件:
subscribepattern允许您使用regex指定感兴趣的主题。请注意,与0.8集成不同,使用subscribe或subscribepattern应该响应在运行流期间添加分区。
您可以使用regex模式来注册所有您想要的主题。

class SubscribePattern[K, V](
    pattern: java.util.regex.Pattern,
    kafkaParams: java.util.Map[String, Object],
    offsets: java.util.Map[TopicPartition, java.util.Long]
) extends ConsumerStrategy[K, V]
ha5z0ras

ha5z0ras3#

您可以订阅多个主题,如topic1、topic2等

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()

更多信息,Kafka指南

相关问题