我配置了一个flume代理,它从ftp服务器读取文件并将文件发送到hdfs接收器。我最大的问题是,我想用原始文件名在hdfs中存储文件。我试过使用spooldir源代码,它工作正常,能够用它们的基名称在hdfs中存储文件,但是flume代理崩溃了:
1) 如果一个文件在放入假脱机目录后被写入,flume将在其日志文件中打印一个错误并停止处理。
2) 如果文件名在以后被重用,flume会将错误打印到其日志文件并停止处理。
实际上,spooldir源代码不适合我的用例。那么,有没有办法让ftp源代码保留文件名,然后hdfs按照文件名分别存储文件呢。
这是我的经纪人:
agent.sources = r1
agent.channels = c1
agent.sinks = k
# configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = pwd
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/flume_ftp_source
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
agent.sources.r1.flushlines = true
# configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path = hdfs://hostname:8020/user/admin/DATA/import_flume/agents/agent1/%Y/%m/%d/%H
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize = 1000000
agent.sinks.k.hdfs.fileType = DataStream
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 1000000
agent.sources.r1.channels = c1
agent.sinks.k.channel = c1
2条答案
按热度按时间oalqel3c1#
正如我所说,根据以下代码更新:fix for flume ftp source,下面是我如何使用%{basename}变量的:
0h4hbjxa2#
我刚刚为flume ftp github项目推出了一个解决方案:
kr,菲利普
有什么技巧可以解决属性%{basename}丢失的问题吗?