Kafka交易生产者和消费者

bakd9h0s  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(392)

我正在研究kafka用例,在这个用例中,我需要在生产者和消费者端具有事务语义。我能够使用kafka事务api 0.11将事务消息发布到kafka集群,但在消费者端,我面临着这个问题…我已经设置了 isolation.level=read_committed 在属性文件中,但我无法使用它..我可以看到正在使用的消息 isolation.level=read_uncommitted 但这不是我们想要的。。
生产商代码

package com.org.kafkaPro;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import kafka.common.AuthorizationException;
import kafka.common.KafkaException;

    public class ProducerWithTx
    {

        public static void main(String args[]) throws FileNotFoundException {
            URL in = ProducerWithTx.class.getResource("producertx.properties");

            Properties props = new Properties();

            try {
                props.load(new FileReader(new File(in.getFile())));
            } catch (IOException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

            Paymnt pay1= new Paymnt();
            pay1.setAccid(1);
            pay1.setAccountstate("y");
            pay1.setAccountzipcode(111);
            pay1.setBankid(12);
            pay1.setCreditcardtype(15);
            pay1.setCustomerid("2");
            SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd");
            Date t = null;
            try {
                t = ft.parse("2017-11-10");
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            pay1.setPeriodid(t);

            String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date());
            props.put("transactional.id", "accid=" + pay1.getAccid() +  " custid=" +pay1.getCustomerid()+  " timestmp=" +timeStamp);
            KafkaProducer<String, Paymnt> producer = new KafkaProducer(props);
            producer.initTransactions();  
            try{
                producer.beginTransaction();

                //RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get();

                producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1));
                producer.commitTransaction();

                //System.out.println("written to" +metadata.partition());

            }
            catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException  e){

                // We can't recover from these exceptions, so our only option is to close the producer and exit.
                producer.close();
            }
            catch(KafkaException e) {
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }
            producer.close();
        }

    }

producertx.properties文件

metadata.broker.list=localhost:9092
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.org.kafkaPro.PaySerializer

# transactional.id=1

enable.idempotence=true
num.partitions=3

消费者

package com.org.kafkaPro;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Consumer {

    private static List<ConsumerMultiThreaded> consumersGroup;

    public static void main(String args[]) throws IOException {

        URL in = ProducerWithTx.class.getResource("consumer.properties");

        Properties props = new Properties();

        try {
            props.load(new FileReader(new File(in.getFile())));
        } catch (IOException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        consumersGroup=new ArrayList<ConsumerMultiThreaded>();
        ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props);
        ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props);
        ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props);
        ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props);

        consumersGroup.add(con1);
        consumersGroup.add(con2);
        consumersGroup.add(con3);
        consumersGroup.add(con4);

        for (ConsumerMultiThreaded consumer : consumersGroup) {

            Thread t=new Thread(consumer);
            t.start();

        }

        while(true){
            try {
                Thread.sleep(100000);
            } catch (InterruptedException ie) {

            }

        }
    }
}

消费者可运行

public class ConsumerMultiThreaded implements Runnable {

private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String, Paymnt> consumer;
    private final int minBatchSize =3;
    private final List<ConsumerRecord<String, Paymnt>> buffer;

    public ConsumerMultiThreaded(Properties props){
        this.consumer= new KafkaConsumer<String, Paymnt>(props);
        buffer = new ArrayList(minBatchSize);
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("test"));
            while (!closed.get()) {
                ConsumerRecords<String,Paymnt> records = consumer.poll(10000);

                for (ConsumerRecord<String, Paymnt> record : records) {
                    buffer.add(record);
                }

                /*for (ConsumerRecord<String, Paymnt> record : records)
                {
                    System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString());
                }*/
                if(buffer.size()>=minBatchSize){
                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, Paymnt> record : partitionRecords) {
                            System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString());
                        }
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                        buffer.clear();
                    }
                }
            }

        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } 
        finally {
            consumer.close();
        }

    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

}

消费者财产

bootstrap.servers=localhost:9092
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.org.kafkaPro.PayDeserializer
enable.auto.commit=false
auto.offset.reset=earliest
group.id=test
isolation.level=read_committed

谢谢你的帮助…谢谢

kd3sttzy

kd3sttzy1#

在您使用的生产者属性中 #transactional.id=1 (正如你刚才提到的)在这件事上你提到 # 符号。这可能会产生问题。
如果不是这样的话,您可以转储主题和事务状态主题的日志段,从那里您可以很容易地调试出错的地方。

相关问题