我有什么似乎是一个简单的Flume配置,这是给我很多问题。让我先描述一下问题,然后列出配置文件。
我有3台服务器:server1,server2,server3。
server1:netcat source/syslogtcp source(我在没有acks的netcat和syslogtcp上都测试过这个)2个内存通道2个avro接收器(每个通道一个)复制选择器,第二个内存通道可选
server2,3:avro源内存通道kafka接收器
在我的模拟中,server2模拟“生产”,因此不会发生任何数据丢失,而server3模拟“开发”,数据丢失很好。我的假设是,使用2个通道和2个源将使这两个服务器彼此解耦,如果server3宕机,它将不会影响sever2(特别是使用可选配置选项!)。然而,事实并非如此。当我运行我的模拟并用ctrl-c终止server3时,我在server2上遇到了减速,从server2到kafka接收器的输出变成了爬行。当我恢复server3上的flume代理时,一切都恢复正常。
我没想到会有这种行为。我所期望的是,因为我有两个通道和两个接收器,如果一个通道和/或接收器坏了,另一个通道和/或接收器应该没有问题。这是Flume的限制吗?这是我的源、汇或通道的限制吗?有没有一种方法让flume在我使用一个代理的情况下运行,其中多个通道和接收器相互解耦?我真的不想在一台机器上为每个“环境”(生产和开发)设置多个flume代理。附件是我的配置文件,以便您可以更专业地查看我所做的工作:
服务器1(第一层代理)
# Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel1 defaultChannel2
agent.sinks = mySink1 mySink2
# Describe/configure the source
agent.sources.mySource.type = netcat
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.ack-every-event = false
# agent.sources.mySource.type = syslogtcp
# agent.sources.mySource.host = 0.0.0.0
# agent.sources.mySource.port = 7103
# agent.sources.mySource.eventSize = 150000
agent.sources.mySource.channels = defaultChannel1 defaultChannel2
agent.sources.mySource.selector.type = replicating
agent.sources.mySource.selector.optional = defaultChannel2
# Describe/configure the channel
agent.channels.defaultChannel1.type = memory
agent.channels.defaultChannel1.capacity = 5000
agent.channels.defaultChannel1.transactionCapacity = 200
agent.channels.defaultChannel2.type = memory
agent.channels.defaultChannel2.capacity = 5000
agent.channels.defaultChannel2.transactionCapacity = 200
# Avro Sink
agent.sinks.mySink1.channel = defaultChannel1
agent.sinks.mySink1.type = avro
agent.sinks.mySink1.hostname = Server2
agent.sinks.mySink1.port = 6666
agent.sinks.mySink2.channel = defaultChannel2
agent.sinks.mySink2.type = avro
agent.sinks.mySink2.hostname = Server3
agent.sinks.mySink2.port = 6666
server2“prod”flume代理
# Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink
# Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel
# Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder
# Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200
# Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server2-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel
server3“dev”flume代理
# Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink
# Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel
# Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder
# Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200
# Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server3-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel
谢谢你的帮助!
1条答案
按热度按时间zzoitvuj1#
我将考虑调整此配置参数,因为它与内存通道有关:
agent.channels.defaultchannel.capacity=5000 agent.channels.defaultchannel.transactioncapacity=200
可能先尝试加倍,然后再次执行测试,您将看到改进:
agent.channels.defaultchannel.capacity=10000 agent.channels.defaultchannel.transactioncapacity=400
在测试期间,观察apacheflume示例的jvm也是一件好事