Jsr223采样器向Kafka主题发送空json

qmelpv7a  于 2022-12-11  发布在  Apache
关注(0)|答案(1)|浏览(126)

我正在使用jsr223采样器发布json消息到Kafka使用Kafka客户端jar。当我发布消息是去Kafka为空。有人能告诉我什么是失踪。实际上消息是去为空的应用程序。下面是我的代码。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonSlurper;
import java.util.ArrayList;
import org.apache.jmeter.threads.JMeterVariables;

Properties props = new Properties();
props.put("bootstrap.servers", "lxkfkbkomsstg01.lowes.com:9093,lxkfkbkomsstg02.lowes.com:9093,lxkfkbkomsstg03.lowes.com:9093,lxkfkbkomsstg04.lowes.com:9093,lxkfkbkomsstg05.lowes.com:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "none");
props.put("batch.size", "16384");
props.put("linger.ms", "0");
props.put("buffer.memory", "33554432");
props.put("acks", "1");
props.put("send.buffer.bytes", "131072");
props.put("receive.buffer.bytes", "32768");
props.put("security.protocol", "SSL");
//props.put("sasl.kerberos.service.name", "kafka");
//props.put("sasl.mechanism", "GSSAPI");
//props.put("ssl.keystore.type", "JKS");
props.put("ssl.truststore.location", "/Users/rajkumar/Documents/EOMS/eoms-truststore-stage.jks");
props.put("ssl.truststore.password", "4DxYJnVDcPi6E8w3uCS63qoa");
props.put("ssl.endpoint.identification.algorithm", "");
props.put("ssl.protocol", "SSL");
props.put("ssl.truststore.type", "JKS");

String eventType="orbit_pick";

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
List<Header> headers = new ArrayList<Header>();
    Header header = new RecordHeader("event_type",eventType.getBytes(StandardCharsets.UTF_8));
    headers.add(header);
//        headers.add(new RecordHeader("event_type",eventType.getBytes(StandardCharsets.UTF_8)));

Date latestdate = new Date();
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("orbit.shipment.lfs.inbound.prf", 1, latestdate.getTime(), "702807441", "{\"SellerOrganizationCode\":\"LOWES\",\"ShipNode\":\"0224\",\"IsShortage\":\"N\",\"ShipmentKey\":\"2022021708351092902896763\",\"ShipmentNo\":\"702807441\",\"Extn\":{\"ExtnPickingHasStartedFlag\":\"Y\",\"ExtnSourceSystem\":\"StoreOrderSvc\",\"ExtnPickerId\":\"98977\",\"ExtnOperation\":\"pick\",\"ExtnInPickupLocker\":\"N\"},\"Instructions\":{\"Instruction\":{\"InstructionText\":\"picking\"},\"Replace\":\"Y\"},\"ShipmentLines\":{\"ShipmentLine\":[{\"BackroomPickedQuantity\":\"1\",\"Quantity\":\"3\",\"CodeValue\":\"\",\"ShipmentLineNo\":\"1\",\"ShipmentSubLineNo\":\"0\",\"ShortageQty\":\"\",\"ItemID\":\"1505\",\"NewShipNode\":\"\",\"Extn\":null}]},\"MessageID\":\"8970549709qqaachwejhk\",\"MessageTimeStamp\":\\"${__time(yyyy-MM-dd'T'hh:mm:ss)}\",\"eventType\":\"orbit_multiple_shipments_customer_pickup\"}", headers);

producer.send(producerRecord);
producer.close();
z31licg0

z31licg01#

我没有看到任何代码,所以它不会向Kafka发送任何信息。
以下是一些可供参考的示例:

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

def props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("acks", "all")
props.put("retries", 0)
props.put("batch.size", 16384)
props.put("linger.ms", 1)
props.put("buffer.memory", 33554432)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

def producer = new KafkaProducer<>(props)

producer.send(new ProducerRecord<>("your-topic", "your-key", "your-json-here"))

producer.close()

P.S.你知道Pepper-Box - Kafka Load Generator插件吗?你会发现它更容易配置和使用。查看Apache Kafka - How to Load Test with JMeter文章了解更多信息

相关问题