如何为Kafka打印scala中的全局ktable?

62o28rlo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(696)

我试图了解globalktable在kafka中是如何工作的,为此我尝试编写示例代码。我已经创建了globalktable,但我想看到我也尝试了peek函数,但它不可用,现在我尝试按视图,但它给出了编译时错误。
在scala中查看globalktable的写入方式是什么?
我试过的是

'''
 val genderGlobalTable: GlobalKTable[String, abc] = builder
        .globalTable(kafkaStreamConfig.getString("abc-topic"),
          Materialized.as("abcStore")
            .withKeySerde(stringSerde)
            .withValueSerde(abcSerde))

      implicit val streams: KafkaStreams = new KafkaStreams(builder.build(), properties)

      val view: ReadOnlyKeyValueStore[String, abc] = streams
        .store("abcStore", new QueryableStoreType[abc])

'''
7y4bm7vi

7y4bm7vi1#

无法创建的示例 QueryableStoreType 这是一个接口。相反,您需要使用工厂类 QueryableStoreTypes (注意,名称是复数,而接口名称是单数)以获取所需的类型。
为了一个 GlobalKTable 你可以用 QueryableStoreTypes.keyValueStore() (或根据您的Kafka流版本, QueryableStoreTypes.timestampedKeyValueStore() ).

相关问题