我使用flume和下面的配置来解析nginx日志并将它们放入kafka中。
# define sources, channels and sink
a1.sources = r1
a1.channels = c2
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /spool/upload_flume
a1.sources.r1.fileSuffix = .DONE
a1.sources.r1.basenameHeader = false
a1.sources.r1.fileHeader = false
a1.sources.r1.batchSize = 1000
a1.sources.r1.deserializer.maxLineLength = 11000
a1.sources.r1.decodeErrorPolicy = IGNORE
a1.sources.r1.deserializer.outputCharset = UTF-8
# define channels
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic = test001_logs
a1.channels.c2.zookeeperConnect = kafka10:2181,kafka11:2181,kafka12:2181
a1.channels.c2.parseAsFlumeEvent = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c2
出于某种原因,在Kafka主题中产生的条目中,有一些unicode符号附加到了日志行中。例如:
\00\F4176.124.146.227 1469439200.715 ...
\00\DE185.18.5.6 1469439200.715 3146510 ...
\00\B0176.15.87.26 1469439200.717 80674 ...
为什么会发生这种情况,如何避免这种问题?
提前谢谢!
更新。如果我使用kafka作为具有相同“spooldir”设置的内存通道的接收器-在kafka主题的结果条目中没有任何unicode添加。但这种方法看起来不是正确的解决方案,因为我必须为内存通道使用额外的资源。
1条答案
按热度按时间g2ieeal71#
尝试
a1.channels.c2.parseAsFlumeEvent = false