Flume配置不工作,显示异常

nue99wik  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(316)

我有以下Flume配置。我正在尝试从spool目录使用flume将大小为9gb的文件传输到hdfs。我有以下Flume配置。


# initialize agent's source, channel and sink

wagent.sources = wavetronix
wagent.channels = memoryChannel2
wagent.sinks = flumeHDFS

# Setting the source to spool directory where the file exists

wagent.sources.wavetronix.type = spooldir
wagent.sources.wavetronix.spoolDir = /johir/WAVETRONIX/output/Yesterday
wagent.sources.wavetronix.fileHeader = false
wagent.sources.wavetronix.basenameHeader = true

# agent.sources.wavetronix.fileSuffix = .COMPLETED

# Setting the channel to memory

wagent.channels.memoryChannel2.type = memory

# Max number of events stored in the memory channel

wagent.channels.memoryChannel2.capacity = 50000
agent.channels.memoryChannel2.batchSize = 1000
wagent.channels.memoryChannel2.transactioncapacity = 1000

# Setting the sink to HDFS

wagent.sinks.flumeHDFS.type = hdfs

# agent.sinks.flumeHDFS.useLocalTimeStamp = true

wagent.sinks.flumeHDFS.hdfs.path =/user/root/WAVETRONIXFLUME/%Y-%m-%d/
wagent.sinks.flumeHDFS.hdfs.useLocalTimeStamp = true
wagent.sinks.flumeHDFS.hdfs.filePrefix= %{basename}
wagent.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable

wagent.sinks.flumeHDFS.hdfs.writeFormat = Text

# use a single csv file at a time

wagent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

wagent.sinks.flumeHDFS.hdfs.rollCount=0
wagent.sinks.flumeHDFS.hdfs.rollInterval=0
wagent.sinks.flumeHDFS.hdfs.rollSize = 6400000
wagent.sinks.flumeHDFS.hdfs.batchSize =1000

# never rollover based on the number of events

wagent.sinks.flumeHDFS.hdfs.rollCount = 0

# rollover file based on max time of 1 min

# agent.sinks.flumeHDFS.hdfs.rollInterval = 0

# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600

# Connect source and sink with channel

wagent.sources.wavetronix.channels = memoryChannel2
wagent.sinks.flumeHDFS.channel = memoryChannel2

但我得到以下例外。
线程“sinkrunner pollingrunner defaultsinkprocessor”java.lang.outofmemoryerror中出现异常:java.util.concurrent.concurrenthashmap.putval处的java堆空间(concurrenthashmap)。java:1043)位于java.util.concurrent.concurrenthashmap.putifabsent(concurrenthashmap。java:1535)在java.lang.classloader.getclassloadinglock(classloader。java:463)在java.lang.classloader.loadclass(类加载器。java:404)在sun.misc.launcher$appclassloader.loadclass(launcher。java:331)在java.lang.classloader.loadclass(classloader。java:357)在org.apache.log4j.spi.loggingevent.(loggingevent。java:165)在org.apache.log4j.category.forcedlog(category。java:391)在org.apache.log4j.category.log(category。java:856)在org.slf4j.impl.log4jloggeradapter.warn(log4jloggeradapter。java:479)在org.apache.flume.sink.hdfs.hdfseventsink.process(hdfseventsink。java:461)在org.apache.flume.sink.defaultsinkprocessor.process(defaultsinkprocessor。java:68)在org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner。java:147)在java.lang.thread.run(线程。java:745)
有人能帮我解决这个问题吗?

whlutmcx

whlutmcx1#

请编辑文件 ${FLUME_HOME}/conf/flume-env.sh ,然后添加以下代码: export JAVA_OPTS="-Xms1000m -Xmx12000m -Dcom.sun.management.jmxremote" 您可以调整选项“xmx”和“xms”。

相关问题