Flume行反序列化程序将unicode符号添加到kafka通道中的日志行

h5qlskok  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(331)

我使用flume和下面的配置来解析nginx日志并将它们放入kafka中。


# define sources, channels and sink

a1.sources = r1
a1.channels = c2

# Describe/configure the source

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /spool/upload_flume
a1.sources.r1.fileSuffix = .DONE
a1.sources.r1.basenameHeader = false
a1.sources.r1.fileHeader = false
a1.sources.r1.batchSize = 1000
a1.sources.r1.deserializer.maxLineLength = 11000
a1.sources.r1.decodeErrorPolicy = IGNORE
a1.sources.r1.deserializer.outputCharset = UTF-8

# define channels

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic = test001_logs
a1.channels.c2.zookeeperConnect = kafka10:2181,kafka11:2181,kafka12:2181
a1.channels.c2.parseAsFlumeEvent = true

# Bind the source and sink to the channel

a1.sources.r1.channels = c2

出于某种原因,在Kafka主题中产生的条目中,有一些unicode符号附加到了日志行中。例如:

\00\F4176.124.146.227   1469439200.715 ...
\00\DE185.18.5.6    1469439200.715  3146510 ... 
\00\B0176.15.87.26  1469439200.717  80674 ...

为什么会发生这种情况,如何避免这种问题?
提前谢谢!
更新。如果我使用kafka作为具有相同“spooldir”设置的内存通道的接收器-在kafka主题的结果条目中没有任何unicode添加。但这种方法看起来不是正确的解决方案,因为我必须为内存通道使用额外的资源。

g2ieeal7

g2ieeal71#

尝试 a1.channels.c2.parseAsFlumeEvent = false

相关问题