spring云流动态通道

izkcnapc  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(377)

我正在使用springcloudstream,希望通过编程方式创建和绑定通道。我的用例是,在应用程序启动期间,我收到要订阅的kafka主题的动态列表。然后如何为每个主题创建一个频道?

yshpjwxd

yshpjwxd1#

我有一个任务,我不知道事先的主题。我解决了这个问题,有一个输入通道,可以听我需要的所有主题。
https://docs.spring.io/spring-cloud-stream/docs/brooklyn.release/reference/html/_configuration_options.html
目的地
绑定中间件上通道的目标目标(例如,rabbitmq交换或kafka主题)。如果频道绑定为使用者,则可以将其绑定到多个目的地,并且可以将目的地名称指定为逗号分隔的字符串值。如果未设置,则使用通道名称。
所以我的配置

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

然后我定义了一个消费者来处理来自所有这些主题的消息。

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());
        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

请注意,在您的情况下,这可能不是一个解决方案。我需要将消息转发到webhooks,这样就可以进行配置Map。
我还考虑了其他的想法。1) 你Kafka客户消费者没有Spring云。
2) 创建预定义数量的输入,例如50。

input-1
intput-2
...
intput-50

然后对其中一些输入进行配置。
相关讨论
支持动态路由消息的spring云流
https://github.com/spring-cloud/spring-cloud-stream/issues/690
https://github.com/spring-cloud/spring-cloud-stream/issues/1089
我们使用SpringCloud2.1.1版本

5fjcxozz

5fjcxozz2#

对于传入消息,可以显式使用 BinderAwareChannelResolver 动态解析目标。你可以检查这个例子 router sink使用绑定器感知的通道解析器。

wz3gfoph

wz3gfoph3#

MessageChannel messageChannel = createMessageChannel(channelName);
messageChannel.send(getMessageBuilder().apply(data));

public MessageChannel createMessageChannel(String channelName) {
return (MessageChannel) applicationContext.getBean(channelName);}

public Function<Object, Message<Object>> getMessageBuilder() {
return payload -> MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();}
bd1hkmkf

bd1hkmkf4#

我最近遇到了类似的情况,下面是我动态创建SubscriberChannel的示例。

ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);
jchrr9hc

jchrr9hc5#

我不得不为camel-spring云流组件做类似的事情。也许绑定目的地的消费者代码“真的只是 String 指示频道名称“对您有用吗?
在我的例子中,我只绑定一个目的地,但是我不认为多个目的地在概念上有多大不同。
其要点如下:

@Override
    protected void doStart() throws Exception {
        SubscribableChannel bindingTarget = createInputBindingTarget();
        bindingTarget.subscribe(message -> {
            // have your way with the received incoming message
        });

        endpoint.getBindingService().bindConsumer(bindingTarget,
                endpoint.getDestination());

       // at this point the binding is done
    }

    /**
     * Create a {@link SubscribableChannel} and register in the
     * {@link org.springframework.context.ApplicationContext}
     */
    private SubscribableChannel createInputBindingTarget() {
        SubscribableChannel channel = endpoint.getBindingTargetFactory()
                .createInputChannel(endpoint.getDestination());
        endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
        channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
                endpoint.getDestination());
        return channel;
    }

有关更多上下文,请参阅此处获取完整的源代码。

相关问题