Spring Cloud流Kafka活页夹不工作

unftdfkk  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(378)

我正在尝试建立,然后通过scst频道获得ktable。但它不起作用。输入ktable没有数据,但是如果我尝试查看kstream聚合(tostream()),我可以看到一些数据。我明白了,ktable是不可查询的,它也没有可查询的名称。
班级:

@Slf4j
@EnableBinding({LimitBinding.class})
public class CommonWorker {

  @Value("${app.dataflow.out-destination}")
  private String customerOut;

  private LimitCustomersHelper custHelper = new LimitCustomersHelper();

  @StreamListener(CUSTOMER_IN)
  public void groupCustomersByLimitIdKTable(KStream<Key, Envelope> input) {
   input
        .filter(custHelper::afterIsNotNull)
        .groupBy(custHelper::groupBy)
        .aggregate(
            custHelper::create,
            custHelper::aggregate,
            custHelper.materialized(customerOut)
        );
  }

  @StreamListener
  public void checkCustomerasTable(@Input(CUSTOMER_OUT) KTable<StringWrapper,LimitCustomers> customers){
    customers.toStream().peek(StreamUtils::peek);
  }

绑定:

public interface LimitBinding {

  String CUSTOMER_IN = "customer-in";
  String CUSTOMER_OUT = "customer-out";

  @Input(CUSTOMER_IN)
  KStream<Key, Envelope> customerInput();

  @Input(CUSTOMER_OUT)
  KTable<StringWrapper, LimitCustomers> customersStream();

}

应用程序.yml:

server.port: 0
spring:
  application.name: connect-producer
  cloud.stream:
    kafka.streams.binder.configuration:
      schema:
        registry.url: http://192.168.99.100:8081
      default:
        key.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
    schema.avro.dynamic-schema-generation-enabled: true
    bindings:
      customer-in:
        contentType: application/*+avro
        destination: ${app.dataflow.in-destination}
        group: ${app.dataflow.in-destination}
      customer-out:
        consumer.materializedAs: ${app.dataflow.out-destination}

app.dataflow:
  in-destination: customer_link
  out-destination: customer_link.next

spring.cloud.stream.kafka.streams.binder:
  brokers: 192.168.99.100:9092
  configuration.application.server: 192.168.99.100:9092
hmtdttj4

hmtdttj41#

通过添加主题名simular table name解决了这个问题

相关问题