我正在尝试建立,然后通过scst频道获得ktable。但它不起作用。输入ktable没有数据,但是如果我尝试查看kstream聚合(tostream()),我可以看到一些数据。我明白了,ktable是不可查询的,它也没有可查询的名称。
班级:
@Slf4j
@EnableBinding({LimitBinding.class})
public class CommonWorker {
@Value("${app.dataflow.out-destination}")
private String customerOut;
private LimitCustomersHelper custHelper = new LimitCustomersHelper();
@StreamListener(CUSTOMER_IN)
public void groupCustomersByLimitIdKTable(KStream<Key, Envelope> input) {
input
.filter(custHelper::afterIsNotNull)
.groupBy(custHelper::groupBy)
.aggregate(
custHelper::create,
custHelper::aggregate,
custHelper.materialized(customerOut)
);
}
@StreamListener
public void checkCustomerasTable(@Input(CUSTOMER_OUT) KTable<StringWrapper,LimitCustomers> customers){
customers.toStream().peek(StreamUtils::peek);
}
绑定:
public interface LimitBinding {
String CUSTOMER_IN = "customer-in";
String CUSTOMER_OUT = "customer-out";
@Input(CUSTOMER_IN)
KStream<Key, Envelope> customerInput();
@Input(CUSTOMER_OUT)
KTable<StringWrapper, LimitCustomers> customersStream();
}
应用程序.yml:
server.port: 0
spring:
application.name: connect-producer
cloud.stream:
kafka.streams.binder.configuration:
schema:
registry.url: http://192.168.99.100:8081
default:
key.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
schema.avro.dynamic-schema-generation-enabled: true
bindings:
customer-in:
contentType: application/*+avro
destination: ${app.dataflow.in-destination}
group: ${app.dataflow.in-destination}
customer-out:
consumer.materializedAs: ${app.dataflow.out-destination}
app.dataflow:
in-destination: customer_link
out-destination: customer_link.next
spring.cloud.stream.kafka.streams.binder:
brokers: 192.168.99.100:9092
configuration.application.server: 192.168.99.100:9092
1条答案
按热度按时间hmtdttj41#
通过添加主题名simular table name解决了这个问题