实际上我有两个问题,第一个问题是:在flume代理刷新整个文件之后,如何使hdfs关闭文件(例如.123456789.tmp)。事实上,该文件从未关闭,直到我迫使flume代理停止。我认为有一种方法使用以下4个参数:
hdfs.rollSize = 0
hdfs.rollCount =0
hdfs.rollInterval = 0
hdfs.batchsize = 1000000
嗯,我的第二个问题是,我的代理flume从sftp服务器接收文件,而我需要将每个文件名保存在hdfs中。它适用于spooldir类型,但不适用于sftp!!有什么想法吗?
flume agent的配置文件如下:
agent.sources = r1
agent.channels = c1
agent.sinks = k
configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = secret
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/test/flumrFTP
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
# agent.sources.r1.batchSize = 1000
agent.sources.r1.flushlines = true
configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path = hdfs://hostname:8000/user/admin/DATA/import_flume/
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize = 1000000
agent.sinks.k.hdfs.fileType = DataStream
Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 1000000
agent.sources.r1.channels = c1
agent.sinks.k.channel = c1
1条答案
按热度按时间drnojrws1#
尝试设置变量
滚动间隔滚动当前文件前等待的秒数
此设置将在设置的秒数后关闭文件。我把我的设置为200秒,我正在加载较小的文件