junit Kafka生成器发送空键而不是字符串

qjp7pelc  于 2022-11-11  发布在  Kafka
关注(0)|答案(2)|浏览(262)

I am using a Producer to send messages to a Kafka topic.
When JUnit testing, I have found that the producer in my application code (but not in my JUnit test class) is sending a null key, despite me providing a String key for it to use.
Code as follows:
Main application class

final Producer<String, HashSet<String>> actualApplicationProducer;

ApplicationInstance(String bootstrapServers) // constructor
{
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "ActualClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));

    actualApplicationProducer = new KafkaProducer<>(props);
}

public void doStuff()
{
    HashSet<String> values = new HashSet<String>();
    String key = "applicationKey";
    // THIS LINE IS SENDING A NULL KEY
    actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values));
}

But, in my junit classes:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@SuppressWarnings("static-method")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CIFFileProcessorTests 
{
    /**An Embedded Kafka Broker that can be used for unit testing purposes. */
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

@BeforeAll
    public void setUpBeforeClass(@TempDir File globalTablesDir, @TempDir File rootDir) throws Exception 
    {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "JUnitClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
        try(Producer<String, HashSet<String>> junitProducer = new Producer<>(props))
        {
            HashSet<String> values = new HashSet<>();
            // Here, I'm sending a record, just like in my main application code, but it's sending the key correctly and not null
            junitProducer.send(new ProducerRecord<>(topicName,"junitKey",values));
        }

    @Test
    public void test()
    {
        ApplicationInstance sut = new ApplicationInstance(embeddedKafkaBroker.getBrokersAsString());
sut.doStuff();

        // "records" is a LinkedBlockingQueue, populated by a KafkaMessageListenerContainer which is monitoring the topic for records using a MessageListener
        ConsumerRecord<String, HashSet<String>> record = records.poll(1,TimeUnit.SECONDS);
        assertEquals("junitKey", record.key()); // TEST FAILS - expected "junitKey" but returned null
    }

Custom serializer:

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                  ObjectOutputStream oos = new ObjectOutputStream(baos)) 
        {
            oos.writeObject(object);
            return baos.toByteArray();
        }

Does anyone know why the KafkaProducer would send a null key when I explicitly specify a String?
--- Update ---
I have tried inspecting the metadata, and the Producer is indeed sending the key, and not null:

RecordMetadata info = actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values)).get(); 
System.out.println("INFO - partition: " + info.partition() + ", topic: " + info.topic() + ", offset: " + info.offset() + ", timestamp: "+ info.timestamp() + ", keysize: " + info.serializedKeySize() + ", valuesize: " + info.serializedValueSize());

output:
INFO - partition: 0, topic: topicName, offset: 2, timestamp: 1656060840304, keysize: 14, valuesize: 6258
The keysize being > 0 shows that null is not passed to the topic.
So, the issue must be with the reading of the topic, perhaps?

7kqas0il

7kqas0il1#

结果,我为我的KafkaMessageListenerContainer使用了一个不同的Deserializer类,它不知道如何处理所提供的String

tmb3ates

tmb3ates2#

不确定为什么要使用ByteArrayOutputStream或ObjectOutputStream来序列化Kafka生成器记录,这可能是您的要求。在这种情况下,您可以从https://dzone.com/articles/kafka-producer-and-consumer-example引用生成器部分
但是在生产者记录中注入键可以很容易地完成。例如,如果你想从AVRO模式生成一个生产者记录,并使用assert来注入记录键和值,你可以这样做。
1.生成AVRO或特定记录
您可以参考https://technology.amis.nl/soa/kafka/generate-random-json-data-from-an-avro-schema-using-java/
您可以使用JSONAVROConverter将其转换为SpecifiRecords:

public static ProducerRecord<String, CustomEvent> generateRecord(){
    String schemaFile = "AVROSchema.avsc";
    Schema schema = getSchema(JSONFile);
    String json = getJson(dataFile);

    byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);

    CustomEventMessage producerRecord = null;
    JsonAvroConverter converter = new JsonAvroConverter();

    try {

        record = converter.convertToSpecificRecord(jsonBytes, CustomEvent.class, schema);
    } catch (Exception e) {

    }

    String recordKey = "YourKey";

    return new ProducerRecord<String, CustomEvent>( topic, recordKey, record);
}

1.您可以稍后将ProducerRecord注入Assert函数。

相关问题