flume kafka接收器无法将完整消息写入kafka代理

h79rfbju  于 2021-06-04  发布在  Flume
关注(0)|答案(0)|浏览(204)

我已经编写了一个进程,在这个进程中,我通过hortonworks提供的自定义flume源和flume kafka接收器生成消息,以写入kafka代理。
在这个过程中,我注意到如果kafka代理已经运行,然后我启动flume代理,它会将每个消息正确地传递给kafka代理,但是当我启动kafka代理时,flume代理已经运行,kafka代理无法接收所有消息。
当我运行kafka console consumer来检查接收到的消息的数量时,我注意到它从开始处删除了一些记录,从结束处删除了一些记录。
我在flume.conf中尝试了多次混搭,但仍能如期工作。
下面是我提供给flume.conf的配置参数-

agent.channels = firehose-channel
agent.sources = stress-source
agent.sinks = kafkasink 

################################# 

# Benchmark Souce Configuration #

################################# 

agent.sources.stress-source.type=com.kohls.flume.source.stress.BenchMarkTestScenriao
agent.sources.stress-source.size=5000
agent.sources.stress-source.maxTotalEvents=30000
agent.sources.stress-source.batchSize=200
agent.sources.stress-source.throughputThreshold=4000
agent.sources.stress-source.throughputControlSeconds=1
agent.sources.stress-source.channels=firehose-channel  

################################# 

# Firehose Channel Configuration #

################################# 

agent.channels.firehose-channel.type = file
agent.channels.firehose-channel.checkpointDir = /data/flume/checkpoint
agent.channels.firehose-channel.dataDirs = /data/flume/data
agent.channels.firehose-channel.capacity = 10000
agent.channels.firehose-channel.transactionCapacity = 10000
agent.channels.firehose-channel.useDualCheckpoints=1
agent.channels.firehose-channel.backupCheckpointDir=/data/flume/backup

############################################ 

# Firehose Sink Configuration - Kafka Sink #

############################################ 

agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.topic = backoff_test_17
agent.sinks.kafkasink.channel=firehose-channel
agent.sinks.kafkasink.brokerList = sandbox.hortonworks.com:6667
agent.sinks.kafkasink.batchsize = 200
agent.sinks.kafkasink.requiredAcks = 1
agent.sinks.kafkasink.kafka.producer.type = async
agent.sinks.kafkasink.kafka.batch.num.messages = 200

我还尝试分析了flume日志,注意到flume度量正确地显示了put和take计数。
请让我知道,如果有人有任何指针来解决这个问题。
提前感谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题