我需要听我开发的flume自定义源中的rabbit队列。这个要求在flume中可能看起来很尴尬,但这正是它所需要的。由于为了简单起见,我使用springamqp来监听队列,所以我无法理解如何在flume lifecycle start()方法中调用onmessage()方法,以便将消息发布到flume频道。我已经研究了springmessagelisteneradapter的概念,但是还没有找到任何实现相同概念的例子。
fquxozlt1#
onMessage() 是…的一部分 MessageListener 图案。它是由外部系统(从大的高度)引发的某种活性成分。它每次都通过远程命令工作,所以你不能把它当作 passive 将由用户调用启动的组件。因为你有“flume lifecycle start()”从另一边和 SimpleMessageListenerContainer 从侧面看也是一样的,我想说的是,你必须把它们的生命周期联系起来,才能协同工作。从这里开始你应该提供 SimpleMessageListenerContainer 一些内联的 MessageListener 实现,调用所需的方法“发布到flume通道”。hth公司更新
onMessage()
MessageListener
passive
SimpleMessageListenerContainer
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); .... container.setMessageListener(new MessageListener() { public void onMessage(Message message) { sendMessageToFlumeChannel(message); } });
在哪里 sendMessageToFlumeChannel 是保持类的方法。当然可以是任何pojo而不是 MessageListener 实现,但主要目标是将侦听器结果委托给某个方法。
sendMessageToFlumeChannel
1条答案
按热度按时间fquxozlt1#
onMessage()
是…的一部分MessageListener
图案。它是由外部系统(从大的高度)引发的某种活性成分。它每次都通过远程命令工作,所以你不能把它当作passive
将由用户调用启动的组件。因为你有“flume lifecycle start()”从另一边和
SimpleMessageListenerContainer
从侧面看也是一样的,我想说的是,你必须把它们的生命周期联系起来,才能协同工作。从这里开始你应该提供
SimpleMessageListenerContainer
一些内联的MessageListener
实现,调用所需的方法“发布到flume通道”。hth公司
更新
在哪里
sendMessageToFlumeChannel
是保持类的方法。当然可以是任何pojo而不是
MessageListener
实现,但主要目标是将侦听器结果委托给某个方法。