我的Flume配置是
agent.sinks.sink.batchSize=10
哪种Flume工艺 10 reacords at each batch ,所有记录的时间相同。因此,这是一种可能的方法,以获得不同的时间为每一个记录在批处理。
10 reacords at each batch
pieyvz9o1#
您是否使用时间戳拦截器来设置时间戳?我们来看看它的源代码:
/** * Modifies events in-place. */ @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); if (preserveExisting && headers.containsKey(TIMESTAMP)) { // we must preserve the existing timestamp } else { long now = System.currentTimeMillis(); headers.put(TIMESTAMP, Long.toString(now)); } return event; } /** * Delegates to {@link #intercept(Event)} in a loop. * @param events * @return */ @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; }
我想执行死刑的时间 intercept(List<Event> events) 方法非常小,批处理中的所有事件都在一毫秒内处理。因此,不可能使用此拦截器强制批处理事件的不同时间戳。如果您需要批处理中每个事件的唯一标识符,您可以基于timestampinterceptor代码编写自己的拦截器,它将批处理中的事件数附加到timestamp中。然而,这并不能保证增强剂的全局唯一性,因为有可能在一毫秒内处理两批。如需更具体的建议,请澄清您的要求。
intercept(List<Event> events)
t9aqgxwy2#
另一个选项是获得事件的正确时间戳—在生成时设置时间戳头,或者解析字符串并以这种方式设置时间戳。
2条答案
按热度按时间pieyvz9o1#
您是否使用时间戳拦截器来设置时间戳?
我们来看看它的源代码:
我想执行死刑的时间
intercept(List<Event> events)
方法非常小,批处理中的所有事件都在一毫秒内处理。因此,不可能使用此拦截器强制批处理事件的不同时间戳。如果您需要批处理中每个事件的唯一标识符,您可以基于timestampinterceptor代码编写自己的拦截器,它将批处理中的事件数附加到timestamp中。然而,这并不能保证增强剂的全局唯一性,因为有可能在一毫秒内处理两批。
如需更具体的建议,请澄清您的要求。
t9aqgxwy2#
另一个选项是获得事件的正确时间戳—在生成时设置时间戳头,或者解析字符串并以这种方式设置时间戳。