读Kafka的《星火中的尾巴》

bttbmeg0  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(489)

我需要订阅Kafka主题 latest 胶印,读一些最新的记录,打印出来,然后完成。在spark我怎么做?我想我可以这样做

sqlContext
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "192.168.1.1:9092,...")
    .option("subscribe", "myTopic")
    .option("startingOffsets", "latest")
    .filter($"someField" === "someValue")
    .take(10)
    .show
myss37ts

myss37ts1#

您需要提前知道您要从kafka使用哪个分区的哪个偏移量。如果你有这些信息,你可以这样做:

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "192.168.1.1:9092,...")
  .option("subscribe", "myTopic")
  .option("startingOffsets", """{"myTopic":{"0":20,"1":20}}""")
  .option("endingOffsets", """{"myTopic":{"0":25,"1":25}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .filter(...)

更多关于 startingOffsets 以及 endingOffsets 在kafka+spark集成指南中给出

相关问题