flume ng/avro源、内存通道和hdfs接收器-小文件太多

vbopmzt1  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(396)

我正面临一个奇怪的问题。我正在寻找从flume到hdfs的大量信息。我应用了推荐的配置来避免太多的小文件,但它不起作用。这是我的配置文件。


# single-node Flume configuration

# Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5458
a1.sources.r1.threads = 20

# Describe the HDFS sink

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://myhost:myport/user/myuser/flume/events/%{senderType}/%{senderName}/%{senderEnv}/%y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = logs-
a1.sinks.k1.hdfs.fileSuffix = .jsonlog
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# never roll-based on time

a1.sinks.k1.hdfs.rollInterval=0

## 10MB=10485760, 128MB=134217728, 256MB=268435456

a1.sinks.kl.hdfs.rollSize=10485760

## never roll base on number of events

a1.sinks.kl.hdfs.rollCount=0
a1.sinks.kl.hdfs.round=false

# Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 5000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这个配置工作,我看到我的文件。但是文件的平均重量是1.5kb。flume控制台输出提供这种信息。

16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating  hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Renaming hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp to hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog
16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Renaming hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp to hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog
16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484509.jsonlog.tmp
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484509.jsonlog.tmp

有人对这个问题有想法吗?
下面是一些关于Flume行为的信息。
命令是flume ng agent-n a1-c/path/to/flume/conf--conf文件sample-flume.conf-dflume.root.logger=trace,console-xms8192m-xmx16384m
注意:logger指令不起作用。我不明白为什么但是我。。。
Flume启动输出为:

16/08/03 15:32:55 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/08/03 15:32:55 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:sample-flume.conf
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
16/08/03 15:32:55 INFO node.AbstractConfigurationProvider: Creating channels
16/08/03 15:32:55 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
16/08/03 15:32:55 INFO node.AbstractConfigurationProvider: Created channel c1
16/08/03 15:32:55 INFO source.DefaultSourceFactory: Creating instance of source r1, type avro
16/08/03 15:32:55 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hdfs
16/08/03 15:32:56 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
16/08/03 15:32:56 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
16/08/03 15:32:56 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 5458 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@466ab18a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
16/08/03 15:32:56 INFO node.Application: Starting Channel c1
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
16/08/03 15:32:56 INFO node.Application: Starting Sink k1
16/08/03 15:32:56 INFO node.Application: Starting Source r1
16/08/03 15:32:56 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5458 }...
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
16/08/03 15:32:56 INFO source.AvroSource: Avro source r1 started.

因为我不能有更详细的输出,我不得不假设

[...]
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1
[...]

指示接收器配置正确。
附言:我看到了以下答案,但没有一个作品(我应该错过一些…)。
flume hdfs sink在hdfs上生成许多小文件
小文件太多hdfsFlume
flume使用avro源和汇分层数据流
FlumehdfsFlume不断滚动小文件

bqjvbblv

bqjvbblv1#

根据您的要求增加批量
a1.k1.hdfs.batchsize=

相关问题