Flink管道如下:
阅读Kafka主题中的消息(字符串)。
通过grok转换为json格式进行模式匹配。
在json中提取的字段上的时间窗口聚合。
下面是使用grok进行模式匹配的代码。
SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
.map(new MapFunction<String, JSONObject>() {
private static final long serialVersionUID = 6;
@Override
public JSONObject map(String value) throws Exception {
JSONObject logJson = new JSONObject();
grok.compile(pattern); //pattern is some pattern defined in the class
Match gm = grok.match(value);
gm.captures();
logJson.putAll(gm.toMap());
return logJson;
}})
在上面的代码编写中 grok.compile(pattern)
在map函数内部工作正常。不这样做会产生以下错误
mapfunction的实现不可序列化
原因:java.io.notserializableexception:com.google.code.regexp.pattern
有没有什么方法可以把grok.compile从Map上删除。根据我的理解,不需要编译每个消息的模式,如果消息的数量变得相当大,可能会造成瓶颈。
附言:我已经进口了 Package oi.thekraken.grok.api.grok
编辑:
我查看了grok实现,grok类实现了serializable。https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/grok.java
1条答案
按热度按时间kqhtkvqz1#
您的代码没有显示局部变量grok的来源,但是:
flink要求所有操作符都是可序列化的,因为它们可能在集群中移动。这也适用于运算符的所有成员。你能给出一个完整的不起作用的例子吗?这样可以更容易地看到序列化可能失败的地方。
有关flink序列化的更多信息,请参阅https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-和https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html
基本上,如果需要不能直接序列化的操作符成员,可以为自定义类型注册kryo序列化程序,或者自己实现(反)序列化。
顺便说一句:我认为你试图减少模式的编译次数是对的