我正在将一个服务从Spring Cloud Stream2.0升级到3.1
之前发布到Kinesis流时,我
@Component
public class KinesisStreamService {
private final Log logger = LogFactory.getLog(this.getClass());
private StreamProcessor orderOut;
@Autowired
public KinesisStreamService(StreamProcessor orderOut) {
this.orderOut = orderOut;
}
public void send(String event) {
if (event != null) {
this.orderOut.ordersOut().send(new GenericMessage(event));
this.logger.debug("Event sent KinesisInStreamServiceImpl : " + event);
} else {
throw new RuntimeException("Event can not be null");
}
}
}
@Service
public interface StreamProcessor {
String INPUT = "ordersIn";
@Output
MessageChannel ordersOut();
@Input
SubscribableChannel ordersIn();
}
现在@Ouput
在Python 3.1中被贬低了,转而支持函数式编程
@EnableBinding @从3.1开始已弃用,支持函数式编程模型,提到创建一个类似于
@Service
class PubSubSendQueue {
@Bean
public Supplier<String> output(){
return Supplier { "Adam" }
}
}
但是,如果在运行时需要向bean定义函数传递参数,那么如何定义Supplier bean呢?对bean进行延迟初始化是否是一种合适的解决方案呢
@Bean
@Lazy
public Supplier<GenericMessage> ordersOut(String event) {
return () -> new GenericMessage(event);
}
依赖关系是
dependencies {
implementation 'org.apache.activemq:activemq-broker:5.17.2'
implementation 'com.amazonaws:aws-java-sdk:1.12.310'
implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.1.0'
implementation 'org.springframework.integration:spring-integration-aws:2.5.4'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kinesis:3.0.0'
implementation 'org.springframework:spring-web:6.0.0'
implementation 'org.springframework.boot:spring-boot-starter:3.0.5'
implementation 'javax.xml.bind:jaxb-api:2.3.0'
implementation 'com.sun.xml.bind:jaxb-impl:2.3.6'
implementation 'org.glassfish.jaxb:jaxb-runtime:2.3.6'
implementation 'javax.activation:activation:1.1.1'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
testImplementation 'org.codehaus.groovy:groovy-all:2.5.18'
testImplementation 'org.spockframework:spock-spring:1.3-groovy-2.5'
testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
testImplementation 'org.springframework:spring-test:6.0.0'
implementation 'cglib:cglib-nodep:3.2.7'
implementation group: 'org.springframework', name: 'spring-jms', version: '6.0.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.0-rc1'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: '2.13.4'
implementation 'javax.jms:javax.jms-api:2.0.1'
implementation 'javax.validation:validation-api:2.0.1.Final'
implementation "jakarta.xml.bind:jakarta.xml.bind-api:2.3.2"
implementation "org.glassfish.jaxb:jaxb-runtime:2.3.2"
1条答案
按热度按时间yrwegjxp1#
使用
StreamBridge
发送任意消息。Supplier<?>
用于轮询消息源。https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources