初始化KafkaProducer对象一次,并由Jmeter脚本中的所有采样器使用同一对象

yhuiod9q  于 2023-03-07  发布在  Apache
关注(0)|答案(2)|浏览(150)

我正在尝试实现一个场景,如下所述:我必须创建一个负载测试脚本,我的异步API发布和使用Kafka主题中的事件。在这个脚本中,有多个采样器发布不同主题的消息,目前他们正在每个采样器中创建一个生产者客户端,然后在那里关闭它。为了提高这个脚本的整体性能,并使脚本更接近现实生活中的场景,我们只想在测试开始时初始化KafkaProducer一次,然后在测试结束时关闭它。因此,我实现了一个解决方案,如下所示:

TestPlan
  ThreadGroup
    1.JSR223Sampler to initialize the Kafka Producer 
    2.Another JSR223 Sampler to publish a message on a topic

使用上面初始化的相同Kafka Producer
但问题是,当我使用vars.putObject()函数将Kafka Producer对象保存到Sampler1中,并使用vars.getObject函数在Sampler2中获取该对象时,它会在Sampler2中返回java.lang.String类的对象。如果我现在尝试在Sampler2中强制转换它,它将不起作用,并抛出ClassCastException
采样器1脚本:

import org.apache.kafka.clients.producer.KafkaProducer;

 Properties props = new Properties();
 props.put("bootstrap.servers", vars.get('bootstrapServer'));
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
 vars.putObject("producer",producer);

采样器2脚本:

`KafkaProducer producer1 =  vars.getObject("producer") as KafkaProducer;
ProducerRecord<String, String> record  = new ProducerRecord<String, String>('REPO-Asset-Partner-QueryRequest', '', "{\"data\":\"{\\\"assetSearchCriteria\\\":[{\\\"attribute\\\":\\\"category\\\",\\\"operator\\\":\\\"EQ\\\",\\\"value\\\":\\\"SYSTEM\\\"}]}\"}")
record.headers().add(new RecordHeader('authorization', bear.getBytes()))
record.headers().add(new RecordHeader('ce_id', vars.get('searchIndex').getBytes()))
record.headers().add(new RecordHeader('ce_type', 'com.fico.repo.ms.asset.events.message.partner.query.AssetSearch'.getBytes()))
record.headers().add(new RecordHeader('ce_time', vars.get('time').getBytes()))
record.headers().add(new RecordHeader('requestId', vars.get('searchCorelation').getBytes()))
producer1.send(record)
producer1.close()`

例外情况:

Response message:javax.script.ScriptException: org.codehaus.groovy.runtime.typehandling.GroovyCastException: Cannot cast object '' with class 'java.lang.String' to class 'org.apache.kafka.clients.producer.KafkaProducer'

我仍然面临ClassCastException。

yhxst69z

yhxst69z1#

看起来producer变量是一个空字符串,所以我建议检查声明它的元素。
如果您显示的是完整的代码,则至少缺少StringSerializer的一个import语句
因此,检查jmeter.log文件中是否有任何可疑条目,并使用调试采样器检查为此线程(虚拟用户)定义的JMeter Variables

pieyvz9o

pieyvz9o2#

您可以尝试使用插件管理器中提供的可用DI-Kafkameter插件。x1c 0d1x
理想情况下,这应该只创建一次连接,使您的生活更轻松。

相关问题