java 带int-kafka的RecordFilterStrategy:消息驱动通道适配器

aurhwmvo  于 2022-12-10  发布在  Java
关注(0)|答案(1)|浏览(110)

是否有方法将自定义RecordFilterStategy注入侦听器?

<int-kafka:message-driven-channel-adapter
        id="kafkaConsumer"
        listener-container="listenerContainer"
        channel="consumerChannel"
        message-converter="messageConverter"
        payload-type="java.lang.String"
/>

我试着做了以下几点:

<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics">
        <list>
            <value>someTopic</value>
        </list>
    </constructor-arg>
    <property name="errorHandler" ref="listenerErrorHandler"/>
    <property name="messageListener" ref="filteringMessageListener"/>
</bean>

<bean id="recordFilterStrategy" class="com.some.path"/>

<bean id="filteringMessageListener" class="org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter">
    <constructor-arg>
        <bean factory-bean="containerProperties" factory-method="getMessageListener"/>
    </constructor-arg>
    <constructor-arg ref="recordFilterStrategy"/>
</bean>

但我得到了一个错误:构造函数引发异常;嵌套的异常是java.lang.IllegalArgumentException:容器不能已有监听程序

nhjlsmyf

nhjlsmyf1#

这是XML配置中的错误。
KafkaMessageDrivenChannelAdapter具有相应的属性:

/**
 * Specify a {@link RecordFilterStrategy} to wrap
 * {@link KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener} into
 * {@link FilteringMessageListenerAdapter}.
 * @param recordFilterStrategy the {@link RecordFilterStrategy} to use.
 */
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {

我们只是错过了为XML配置公开它。
请随时提出一个生长激素的问题上的问题!
作为一种解决方法,考虑为KafkaMessageDrivenChannelAdapter声明一个常规<bean>,而不是该XML配置。或者干脆放弃XML配置,转而使用Java DSL:https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-adapter-configuration

相关问题