kafka流窗口与自定义时间戳提取器

sy5wg1nm  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(411)

我正在尝试创建一个kafka流应用程序,在该应用程序中,我正在尝试计算一个时间窗口内每个平台的唯一设备。
事件类

public class Event {
    private String eventId;
    private String deviceId;
    private String platform;
    private ZonedDateTime createdAt;
}

我需要的时间窗口尊重事件的createdat,所以我写了一个 TimestampExtractor 具体实施如下:

public class EventTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
        final Event event = (Event) record.value();
        final ZonedDateTime eventCreationTime = event.getCreatedAt();
        final long timestamp = eventCreationTime.toEpochSecond();

        log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp);

        return timestamp;
    }
}

最后,这是我的流媒体应用程序代码:

final KStream<String, Event> eventStream = builder.stream("events_ingestion");

eventStream
    .selectKey((key, event) -> {
        final String platform = event.getPlatform();
        final String deviceId = event.getDeviceId());

        return String.join("::", platform, deviceId);
    })
    .groupByKey()
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
    .count(Materialized.as(COUNT_STORE));

当我把一件事推到 event_ingestion 主题中,我可以看到时间戳被记录到应用程序日志中,数据被写入计数存储。
当我迭代计数存储时,我看到以下内容:

Key: [ANDROID::1@1539000000/1539900000], Value: 2

虽然我的时间窗口是15分钟,但关键是10天。如果我从stream config中删除timestampextractor实现(因此返回到处理时间),那么密钥的时间跨度将达到预期的15分钟:

Key: [ANDROID::1@1539256500000/1539257400000], Value: 1

我做错什么了?有什么想法吗?

mznpcxlj

mznpcxlj1#

timestampextractor使用epoch毫秒值进行窗口设置。你正在计算“秒”,这将把信息进入错误的时间窗口。

相关问题