kafka产生多值消息

xn1cxnb4  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(279)

我是个新手 Kafka .
我是否能够发送具有多个值的消息?例如,我这里有一个例子,它产生一个随机数的消息,但是我想为每个产生的记录添加一个时间戳。标准方法是什么?

while(true){
        for(int key=0; key < 10000; key++){
            Random rand = new Random();
            int  n = rand.nextInt(50) + 1;

            SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
            Date date = new Date();
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("java-topic", Integer.toString(1),Integer.toString(n));
            producer.send(producerRecord);
            Thread.sleep(10000);
        }
        //producer.close();
    }
vltsax25

vltsax251#

在内部,kafka代理(服务器)只存储一个字节数组。因此,必须对消息进行编码以包含所有值。
一种流行的方法是在消息中使用json编码

int numbers[] = {1,2,3};
String msg = String.format("{\"numbers\": %s, \"timestamp\": \"%s\"}", 
                java.util.Arrays.toString(numbers), timestamp);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
       "java-topic", msg);

注意kafka服务器在接收到一条消息时添加了一个时间戳,该消息可以在consumerrecord.timestamp()中看到

fruv7luv

fruv7luv2#

谢谢你的帮助!!输出答案

while(true){
    for(int key=0; key < 10000; key++){
        Random rand = new Random();
        Date date = new Date();
        SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");

        int  n = rand.nextInt(50) + 1;
        String today = formatter.format(date);
        String msg = String.format("{\"numbers\": %s, \"timestamp\": \"%s\"}", n, today);

        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
                "java-topic", Integer.toString(1), msg);
        producer.send(producerRecord);
        Thread.sleep(1000);
    }
    //producer.close();
}

相关问题