我从Kafka主题中获取数据,并以deltalake(Parquet地板)格式存储它们。我想知道某一天收到的邮件数量。我的思考过程:我想用spark读取以parquet格式存储数据的目录,并对某一天带有“.parquet”的文件应用count。这将返回一个计数,但我不确定这是否正确。这样对吗?有没有其他方法可以计算某一天(或某一时间段)从Kafka主题中获取的消息数?
vulvrdjw1#
我们从主题中使用的消息不仅有键值,而且还有其他信息,如时间戳可以用来追踪消费流。时间戳由代理或生产者根据主题配置更新时间戳。如果topic配置的时间戳类型为create\u time,则代理将使用producer记录中的时间戳,而如果topic配置为log\u append\u time,则在附加记录时,代理将用代理本地时间覆盖时间戳。所以,如果您将时间戳存储在任何地方,您可以很好地跟踪每天或每小时的消息速率。另外,您可以使用一些kafka Jmeter 板,如confluent control center(许可证价格)或grafana(免费)或任何其他工具来跟踪消息流。在我们的例子中,在使用消息和存储或处理消息的同时,我们还将消息的元细节路由到ElasticSearch,并通过kibana将其可视化。
1条答案
按热度按时间vulvrdjw1#
我们从主题中使用的消息不仅有键值,而且还有其他信息,如时间戳
可以用来追踪消费流。
时间戳由代理或生产者根据主题配置更新时间戳。如果topic配置的时间戳类型为create\u time,则代理将使用producer记录中的时间戳,而如果topic配置为log\u append\u time,则在附加记录时,代理将用代理本地时间覆盖时间戳。
所以,如果您将时间戳存储在任何地方,您可以很好地跟踪每天或每小时的消息速率。
另外,您可以使用一些kafka Jmeter 板,如confluent control center(许可证价格)或grafana(免费)或任何其他工具来跟踪消息流。
在我们的例子中,在使用消息和存储或处理消息的同时,我们还将消息的元细节路由到ElasticSearch,并通过kibana将其可视化。