我正在尝试实现一个场景,如下所述:我必须创建一个负载测试脚本,我的异步API发布和使用Kafka主题中的事件。在这个脚本中,有多个采样器发布不同主题的消息,目前他们正在每个采样器中创建一个生产者客户端,然后在那里关闭它。为了提高这个脚本的整体性能,并使脚本更接近现实生活中的场景,我们只想在测试开始时初始化KafkaProducer一次,然后在测试结束时关闭它。因此,我实现了一个解决方案,如下所示:
TestPlan
ThreadGroup
1.JSR223Sampler to initialize the Kafka Producer
2.Another JSR223 Sampler to publish a message on a topic
使用上面初始化的相同Kafka Producer
但问题是,当我使用vars.putObject()函数将Kafka Producer对象保存到Sampler1中,并使用vars.getObject函数在Sampler2中获取该对象时,它会在Sampler2中返回java.lang.String类的对象。如果我现在尝试在Sampler2中强制转换它,它将不起作用,并抛出ClassCastException
采样器1脚本:
import org.apache.kafka.clients.producer.KafkaProducer;
Properties props = new Properties();
props.put("bootstrap.servers", vars.get('bootstrapServer'));
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
vars.putObject("producer",producer);
采样器2脚本:
`KafkaProducer producer1 = vars.getObject("producer") as KafkaProducer;
ProducerRecord<String, String> record = new ProducerRecord<String, String>('REPO-Asset-Partner-QueryRequest', '', "{\"data\":\"{\\\"assetSearchCriteria\\\":[{\\\"attribute\\\":\\\"category\\\",\\\"operator\\\":\\\"EQ\\\",\\\"value\\\":\\\"SYSTEM\\\"}]}\"}")
record.headers().add(new RecordHeader('authorization', bear.getBytes()))
record.headers().add(new RecordHeader('ce_id', vars.get('searchIndex').getBytes()))
record.headers().add(new RecordHeader('ce_type', 'com.fico.repo.ms.asset.events.message.partner.query.AssetSearch'.getBytes()))
record.headers().add(new RecordHeader('ce_time', vars.get('time').getBytes()))
record.headers().add(new RecordHeader('requestId', vars.get('searchCorelation').getBytes()))
producer1.send(record)
producer1.close()`
例外情况:
Response message:javax.script.ScriptException: org.codehaus.groovy.runtime.typehandling.GroovyCastException: Cannot cast object '' with class 'java.lang.String' to class 'org.apache.kafka.clients.producer.KafkaProducer'
我仍然面临ClassCastException。
2条答案
按热度按时间yhxst69z1#
看起来
producer
变量是一个空字符串,所以我建议检查声明它的元素。如果您显示的是完整的代码,则至少缺少StringSerializer的一个import语句
因此,检查jmeter.log文件中是否有任何可疑条目,并使用调试采样器检查为此线程(虚拟用户)定义的JMeter Variables
pieyvz9o2#
您可以尝试使用插件管理器中提供的可用DI-Kafkameter插件。x1c 0d1x
理想情况下,这应该只创建一次连接,使您的生活更轻松。