我正在尝试使用flume将一些数据流式传输到hdfs中,并将单个代理配置为具有netcat源、内存通道和hdfs接收器。
配置如下:
a1.sources = src1
a1.channels = ch1
a1.sinks = snk1
# SOURCES CONFIGURATION
a1.sources.src1.type = netcat
a1.sources.src1.bind = 0.0.0.0
a1.sources.src1.port = 99999
a1.sources.src1.ack-every-event = false
# SOURCE -> CHANNEL
a1.sources.src1.channels = ch1
# SINKS' CONFIGURATION
a1.sinks.snk1.type = hdfs
a1.sinks.snk1.hdfs.path = /somepath
a1.sinks.snk1.hdfs.writeFormat = Text
a1.sinks.snk1.hdfs.fileType = DataStream
a1.sinks.snk1.hdfs.inUseSuffix = .tmp
a1.sinks.snk1.hdfs.filePrefix = prefix_file
a1.sinks.snk1.hdfs.batchSize = 75000
a1.sinks.snk1.hdfs.rollInterval = 120
a1.sinks.snk1.hdfs.rollCount = 0
a1.sinks.snk1.hdfs.idleTimeout = 0
# 128MB for each file maximum = 128 * 1024 (MB) * 1024 (KB) = ...
a1.sinks.snk1.hdfs.rollSize = 134217728
a1.sinks.snk1.hdfs.threadsPoolSize = 25
# SINK <- CHANNEL
a1.sinks.snk1.channel = ch1
# CHANNELS' CONFIGURATION
a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 5000000
a1.channels.ch1.transactionCapacity = 100000
# 412MB of byte capacity = 412 * 1024 * 1024 byte
# a1.channels.ch1.byteCapacity = 432013312
但是,如果发送的消息超过某个带宽,则会出现以下异常:
2014-11-21 05:48:07,035 (netcat-handler-0) [WARN - org.apache.flume.source.NetcatSource$NetcatSocketHandler.processEvents(NetcatSource.java:407)] Error processing event. Exception follows.
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: ch1}
at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
at org.apache.flume.source.NetcatSource$NetcatSocketHandler.processEvents(NetcatSource.java:394)
at org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:321)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Heap space limit of 3456106reached. Please increase heap space allocated to the channel as the sinks may not be keeping up with the sources
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:123)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
... 7 more
我无法更改堆空间的值,在conf/flume-env.sh中,我有:
JAVA_OPTS="-Xms256m -Xmx512m -Dcom.sun.management.jmxremote"
异常中堆空间的大小应该用byte表示,这意味着我有一个3,3mb的堆空间,这个值非常低,但是我不知道这个值是从哪里来的。。。!我怎样才能解决这个问题?事先非常感谢!
1条答案
按热度按时间pkmbmrz71#
您有几个nob可供使用以使其正常工作:
增加
byteCapacity
:a1.channels.ch1.byteCapacity = 6912212
.如上面的评论所建议的那样增加记忆(
JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"
)可能是最好的选择。原因是byteCapacity
是80%的进程最大内存,这已经消耗了大量的进程内存。收缩
byteCapacityBufferPercentage
这就减少了页眉的空间。