I am using a Producer to send messages to a Kafka topic.
When JUnit testing, I have found that the producer in my application code (but not in my JUnit test class) is sending a null key, despite me providing a String key for it to use.
Code as follows:
Main application class
final Producer<String, HashSet<String>> actualApplicationProducer;
ApplicationInstance(String bootstrapServers) // constructor
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ActualClient");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
actualApplicationProducer = new KafkaProducer<>(props);
}
public void doStuff()
{
HashSet<String> values = new HashSet<String>();
String key = "applicationKey";
// THIS LINE IS SENDING A NULL KEY
actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values));
}
But, in my junit classes:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@SuppressWarnings("static-method")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CIFFileProcessorTests
{
/**An Embedded Kafka Broker that can be used for unit testing purposes. */
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@BeforeAll
public void setUpBeforeClass(@TempDir File globalTablesDir, @TempDir File rootDir) throws Exception
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "JUnitClient");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
try(Producer<String, HashSet<String>> junitProducer = new Producer<>(props))
{
HashSet<String> values = new HashSet<>();
// Here, I'm sending a record, just like in my main application code, but it's sending the key correctly and not null
junitProducer.send(new ProducerRecord<>(topicName,"junitKey",values));
}
@Test
public void test()
{
ApplicationInstance sut = new ApplicationInstance(embeddedKafkaBroker.getBrokersAsString());
sut.doStuff();
// "records" is a LinkedBlockingQueue, populated by a KafkaMessageListenerContainer which is monitoring the topic for records using a MessageListener
ConsumerRecord<String, HashSet<String>> record = records.poll(1,TimeUnit.SECONDS);
assertEquals("junitKey", record.key()); // TEST FAILS - expected "junitKey" but returned null
}
Custom serializer:
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos))
{
oos.writeObject(object);
return baos.toByteArray();
}
Does anyone know why the KafkaProducer
would send a null key when I explicitly specify a String?
--- Update ---
I have tried inspecting the metadata, and the Producer is indeed sending the key, and not null:
RecordMetadata info = actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values)).get();
System.out.println("INFO - partition: " + info.partition() + ", topic: " + info.topic() + ", offset: " + info.offset() + ", timestamp: "+ info.timestamp() + ", keysize: " + info.serializedKeySize() + ", valuesize: " + info.serializedValueSize());
output:
INFO - partition: 0, topic: topicName, offset: 2, timestamp: 1656060840304, keysize: 14, valuesize: 6258
The keysize being > 0 shows that null is not passed to the topic.
So, the issue must be with the reading of the topic, perhaps?
2条答案
按热度按时间7kqas0il1#
结果,我为我的
KafkaMessageListenerContainer
使用了一个不同的Deserializer类,它不知道如何处理所提供的Stringtmb3ates2#
不确定为什么要使用ByteArrayOutputStream或ObjectOutputStream来序列化Kafka生成器记录,这可能是您的要求。在这种情况下,您可以从https://dzone.com/articles/kafka-producer-and-consumer-example引用生成器部分
但是在生产者记录中注入键可以很容易地完成。例如,如果你想从AVRO模式生成一个生产者记录,并使用assert来注入记录键和值,你可以这样做。
1.生成AVRO或特定记录
您可以参考https://technology.amis.nl/soa/kafka/generate-random-json-data-from-an-avro-schema-using-java/
您可以使用JSONAVROConverter将其转换为SpecifiRecords:
1.您可以稍后将ProducerRecord注入Assert函数。