我是kafka的新手,我用java创建了一个示例consumer,用于从kafka主题中获取数据,但是当我使用消息时,会带来额外的字符串或字符,比如一些特殊字符(????xmydata),比如这样。
如果我没有使用正确的代码,请给我提供适当的代码。
The code i am using..
getting output = ?????mydata
expected output = mydata
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "mysql-joined-datalist2");
props.put("auto.commit.interval.ms", "1000");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//kafka consumer object
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//subscribe to topic
consumer.subscribe(Arrays.asList("mysql-joined-datalist2"));
//infinite poll loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(0);
for (ConsumerRecord<String, String> record : records) {
String payload = record.value();
// InputStream is = new ByteArrayInputStream(payload.getBytes("UTF-8"));
String oldString = record.value();
String newString = new String(oldString.getBytes("UTF-8"), "UTF-8");
System.out.println("newString : " + newString);
String s2 = new String(record.value().getBytes("ISO-8859-1"),"UTF-8");
System.out.println("str : "+s2); // output = str : ?????Xmydata
}
please help me. thanks in advanced.
暂无答案!
目前还没有任何答案,快来回答吧!