Kafka只流一次水

bqjvbblv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(239)

我的目标是从主题a消费,做一些处理,并产生主题b,作为一个单一的原子动作。要实现这一点,我有两个选择:
使用SpringKafka@kafkalistener和Kafka模板,如下所述。
使用streams eos(恰好一次)功能。
我已成功验证选项1。成功的意思是,如果处理失败(抛出illegalargumentexception),则来自主题a的已消费消息将继续被kafkalistener消费。这是我所期望的,因为偏移量没有提交,并且使用了defaultafterrollbackprocessor。
如果我不使用kafkalistener,而是使用流从主题a消费、处理并发送到主题b(选项2),我希望看到相同的行为。但是,即使在我处理illegalargumentexception时抛出,消息也只被流消耗一次。这是预期的行为吗?
在streams的情况下,我只有以下配置:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");        
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

我的流是这样的:

@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;

@Autowired
private TaxService taxService;

@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {

    KStream<String, InvoiceEvents> kStream = builder.stream("A",
            Consumed.with(Serdes.String(), eventSerde));

      kStream
        .mapValues(v -> 
            {
                // get tax from possibly remote service
                // an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
                int tax = taxService.getTaxForInvoice(v);
                // create a TaxCalculated event
                InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
                log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
                return taxCalculatedEvent;
            })
        .to("B", Produced.with(Serdes.String(), eventSerde));

    return kStream;
}

happy path streams场景可以工作:如果在处理过程中没有抛出异常,消息将正确地显示在主题b中。
我的单元测试:

@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
    log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
    Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
                        .thenThrow(new IllegalArgumentException("Tax calculation failed"));

    InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
    List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
            new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));

     Properties producerConfig = new Properties();
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
     producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
     producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
     producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
     producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");

    // produce with key
    IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);

    // wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
    Thread.sleep(30000);
// ...
}

更新:在我的单元测试中,我发送了50个invoiceevents(orderid=1,…,50),我处理它们并将它们发送到目标主题。
在我的日志中,我看到的行为如下:

invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46**→ consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed

为什么在第二次失败后,它会重新使用invoiceevent.orderid=46?

rks48beu

rks48beu1#

使选项2(流事务)工作的关键点是:
分配一个thread.uncaughtexceptionhandler(),以便在出现任何未捕获的异常时启动一个新的streamthread(默认情况下,streamthread终止-请参阅下面的代码段)。如果kafka代理的生产失败,这甚至可能发生,它不必与流中的业务逻辑代码相关。
考虑设置一个策略来处理消息的反序列化(当您使用时)。检查默认\反序列化\异常\处理程序\类\配置(javadoc)。例如,您应该忽略并使用下一条消息,还是停止使用来自相关kafka分区的消息。
对于流,即使将max\u poll\u records\u config设置为1(每个轮询/批处理一个记录),仍然不会提交每个消息所消耗的偏移量和生成的消息。这种情况会导致问题中描述的情况(请参阅“为什么在第二次失败后,它会从invoiceevent.orderid=46重新消耗?”)。
kafka事务在windows上还不能工作。修复程序将在kafka 1.1.1中交付(https://issues.apache.org/jira/browse/kafka-6052).
考虑检查如何处理序列化异常(或在生产过程中的一般异常)(这里和这里)

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) 
{
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

相关问题