flume事件被截断

wfauudbj  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(629)

这里我面临一个问题,我从kafka源代码接收消息,并编写一个拦截器从kafka消息(json格式)中提取两个字段(datasource和businesstype)。这里我使用gson.fromjson()。但问题是我犯了以下错误。
这里我想知道当Flume事件超过某个极限时,Flume是否会截断它?如果是,如何将其设置为更大的值。因为我的Kafka信息总是很长,大约6万字节。
期待回复。提前谢谢!
2015-12-09 11:48:05665(pollablesourcerunner-kafkasource应用)[错误-org.apache.flume.source.kafka.kafkasource.process(kafkasource。java:153)]kafkasource异常,{}com.google.gson.jsonsyntaxexception:com.google.gson.stream.malformedjsonexception:com.google.gson.gson.fromjson(gson)的第1行第4096列的字符串未终止。java:809)在com.google.gson.gson.fromjson(gson。java:761)在com.google.gson.gson.fromjson(gson。java:710)在com..flume.interceptor.jsonlogtypeinterceptor.intercept(jsonlogtypeinterceptor。java:43)在com..flume.interceptor.jsonlogtypeinterceptor.intercept(jsonlogtypeinterceptor。java:61)在org.apache.flume.interceptor.interceptorchain.intercept(interceptorchain。java:62)在org.apache.flume.channel.channelprocessor.processeventbatch(channelprocessor。java:146)在org.apache.flume.source.kafka.kafkasource.process(kafkasource。java:130)

xqkwcwgp

xqkwcwgp1#

最后,我通过调试源代码找到了根本原因。这是因为我试图使用gson将event.getbody()转换为一个Map,这是不正确的,因为event.getbody()是一个字节[],而不是一个字符串,无法转换。正确的代码如下:

String body = new String(event.getBody(), "UTF-8");   
Map<String, Object> map = gson.fromJson(body, new TypeToken<Map<String, Object>>() {}.getType());

相关问题