kafka流-按时间戳/序列持久化消息?

hvvq6cgz  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(258)

我收到Kafka流的信息。它们是由用户id输入的,当它们进入时会被赋予一个序列号和时间戳。邮件在15分钟后“过期”。用户可以根据给定的时间(最多15分钟)或顺序请求新消息。
我最初的想法是这样的:
`streamsbuilder streamsbuilder=新建streamsbuilder();

KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
  messageSupplier = Stores.persistentKeyValueStore("user.messages");

  KTable<String, MessageCache> messageTable = inboundStream
      .filter(this::userExists)
      .peek(this::recordInboundMessage)
      .map(this::markMessage)       // add sequence/timestamp
      .groupByKey()
      .aggregate(this::createMessageCache,
              this::addMessageToMessageCache,
              Materialized.as(messageSupplier));

  // ---> Some other setup stuff, then start the streams

这个MessageCache` 保存消息列表(将消息添加到缓存时删除过期的消息)。当我收到消息请求时,我会浏览列表并筛选出合适的消息。
我想我可以使用其中一种窗口策略,但是找不到一个实际保存消息列表的示例。
这是最好的方法吗?或者我遗漏了什么可以让这更容易/更好的东西?

3zwtqj6y

3zwtqj6y1#

这是最好的方法吗?或者我遗漏了什么可以让这更容易/更好的东西?
我认为您有一个使用本机java类的简单解决方案,可以有效地将streams应用程序与您的代码连接起来。。。为了简单,有很多话要说!我能看到的唯一缺点是,如果事件率太高,用户缓存可能会超出内存大小。此外,如果您需要容错,流应用程序将在出现故障时在另一个应用程序示例上重建状态存储的内容。但如果这不是问题,那就去做吧!
但是,就如何在streams应用程序上下文中实现这一点而言,您可以做一些调整:
定义要支持的用户查询的粒度。几分钟?几秒钟?为了辩论,我们说几分钟吧。根据这个粒度设置流的窗口。
定义一个累加器,类似于您现有的累加器,它将接受一个用户记录并将其添加到列表中。有点像 UserRecordGroup 有一个 ListUserRecord ,以及一种方法 add(UserEvent evt) 将附加一个 UserRecordList .
然后,您可以构建您的streams应用程序,如:

KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
 Materialized<String, UserRecordGroup, WindowStore<Bytes, byte[]>> userStore =
 Materialized.<String, UserRecordGroup, WindowStore<Bytes,byte[]>>as("user.messages")
  .withValueSerde(/* your serializers here */);

KTable<String, MessageCache> messageTable = inboundStream
  .filter(this::userExists)
  .peek(this::recordInboundMessage)
  .map(this::markMessage)       // add sequence/timestamp
  .groupByKey()
  .windowedBy(TimeWindows.of(ONE_MINUTE_IN_MS))
  .aggregate(UserRecordGroup::new,
            (key, value, agg) -> { agg.add(value); },
             userStore);

最后,当您想提供来自商店的查询时,您可以

ReadOnlyWindowStore<Integer, UserRecordGroup> store =
   streams.store("user.messages", QueryableStoreTypes.windowStore());
WindowStoreIterator<UserRecordGroup> windowIterator = 
     store.fetch(pathHash, startTimestamp, endTimeStamp);

迭代器将包含不同窗口的所有记录列表;将这些列表合并在一起,就可以得到starttimestamp和endtimestamp之间的用户活动描述。

相关问题