我有一个Kafka流应用程序,等待记录被发表的主题 user_activity
. 它将接收json数据,并根据键的值将该流推送到不同的主题中。
这是我的streams应用程序代码:
KStream<String, String> source_user_activity = builder.stream("user_activity");
source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("value: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString());
send.put("activity_time", received.get("CreationTime"));
send.put("user_id", received.get("UserId"));
send.put("operation_type", received.get("Operation"));
send.put("app_name", received.get("Workload"));
keywords.add(send.toString());
// apply regex to value and for each match add it to keywords
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("user_activity_by_date");
在这段代码中,我想检查操作类型,然后根据操作类型将流推入相关主题。
我怎样才能做到这一点?
编辑:
我已将代码更新为:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch(
(key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
(key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
(key, value) -> true
);
branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");
3条答案
按热度按时间68de4m5k1#
原版
KStream.branch
方法是不方便的,因为混合数组和泛型,因为它迫使人们使用“幻数”从结果中提取正确的分支(例如kafka-5488问题)。从SpringKafka2.2.4开始,kafkastreambrancher类可用。使用它,可以实现更方便的分支:也有kip-418,所以Kafka本身的分支也有可能得到改进。
4xy9mtcn2#
你可以用
branch
方法来分割流。此方法使用 predicate 将源流拆分为多个流。以下代码取自Kafka流示例:
j7dteeu83#
另一种可能是使用topicnameextractor动态路由事件:
https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing
你需要提前创建主题,
defineoutputtopic可以返回给定值(或键或记录上下文)的一组已定义主题中的一个。pd:对不起,scala代码,链接中有一个java示例。