Spring云流3.1发送事件

tquggr8v  于 2023-04-04  发布在  Spring
关注(0)|答案(1)|浏览(107)

我正在将一个服务从Spring Cloud Stream2.0升级到3.1
之前发布到Kinesis流时,我

@Component
public class KinesisStreamService {
    private final Log logger = LogFactory.getLog(this.getClass());
    private StreamProcessor orderOut;

    @Autowired
    public KinesisStreamService(StreamProcessor orderOut) {
        this.orderOut = orderOut;
    }

    public void send(String event) {
        if (event != null) {
            this.orderOut.ordersOut().send(new GenericMessage(event));
            this.logger.debug("Event sent KinesisInStreamServiceImpl : " + event);
        } else {
            throw new RuntimeException("Event can not be null");
        }
    }
}
@Service
public interface StreamProcessor {
    String INPUT = "ordersIn";

    @Output
    MessageChannel ordersOut();

    @Input
    SubscribableChannel ordersIn();
}

现在@Ouput在Python 3.1中被贬低了,转而支持函数式编程
@EnableBinding @从3.1开始已弃用,支持函数式编程模型,提到创建一个类似于

@Service
class PubSubSendQueue {
    @Bean
    public Supplier<String> output(){
        return Supplier { "Adam" }
    }
}

但是,如果在运行时需要向bean定义函数传递参数,那么如何定义Supplier bean呢?对bean进行延迟初始化是否是一种合适的解决方案呢

@Bean
    @Lazy
    public Supplier<GenericMessage> ordersOut(String event) {
        return () -> new GenericMessage(event);
    }

依赖关系是

dependencies {
    implementation 'org.apache.activemq:activemq-broker:5.17.2'
    implementation 'com.amazonaws:aws-java-sdk:1.12.310'
    implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.1.0'
    implementation 'org.springframework.integration:spring-integration-aws:2.5.4'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kinesis:3.0.0'
    implementation 'org.springframework:spring-web:6.0.0'
    implementation 'org.springframework.boot:spring-boot-starter:3.0.5'
    implementation 'javax.xml.bind:jaxb-api:2.3.0'
    implementation 'com.sun.xml.bind:jaxb-impl:2.3.6'
    implementation 'org.glassfish.jaxb:jaxb-runtime:2.3.6'
    implementation 'javax.activation:activation:1.1.1'
    implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
    testImplementation 'org.codehaus.groovy:groovy-all:2.5.18'
    testImplementation 'org.spockframework:spock-spring:1.3-groovy-2.5'
    testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
    testImplementation 'org.springframework:spring-test:6.0.0'
    implementation 'cglib:cglib-nodep:3.2.7'
    implementation group: 'org.springframework', name: 'spring-jms', version: '6.0.0'
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.0-rc1'
    implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: '2.13.4'
    implementation 'javax.jms:javax.jms-api:2.0.1'
    implementation 'javax.validation:validation-api:2.0.1.Final'
    implementation "jakarta.xml.bind:jakarta.xml.bind-api:2.3.2"
    implementation "org.glassfish.jaxb:jaxb-runtime:2.3.2"
yrwegjxp

yrwegjxp1#

使用StreamBridge发送任意消息。Supplier<?>用于轮询消息源。
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

相关问题