逐行读取csv文件并生成带有随机延迟发送的某些行的流

ffvjumwh  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(433)

我正在逐行读取3个csv文件,并将它们发送到消息队列(apachekafka)。通过增加时间戳值对数据行进行排序。我通过查看最后一个时间戳和新的时间戳来模拟流,并根据这两个时间戳的差异使线程休眠。我这样做是为了模拟消息的产生。
现在我想让这些消息中的一些被随机延迟一段时间,并准备了一个函数,该函数将线程置于睡眠状态一段随机时间,并随机选择何时执行此操作。
当我这样做的时候,我把整个csv文件的读取延迟到所有必须产生的后续消息。
可能我在这方面缺乏经验,但我不知道如何随机地将我正在生成的一条消息置于睡眠状态,而不延迟所有必须到来的消息?

//Read the CSV file line by line, serialize into object and put to sleep fo
public void readLikesEventStreamCSV(
        final BufferedReader bufferedReader, StreamproducerApplication.StreamProducer producer) throws IOException {
    String last_timestamp = "";
    StreamWaitSimulation sleep = new StreamWaitSimulation();
    try {
        String line;
        line = bufferedReader.readLine(); //read the first line. we do nothing with it.
        while ((line = bufferedReader.readLine()) != null) {
            final String[] lineArray = pattern.split(line);

            LikesEventStream value = new LikesEventStream
                    .Builder()
                    .personId(Integer.parseInt(lineArray[0]))
                    .postId(Integer.parseInt(lineArray[1].equals("") ? "-1":lineArray[1] )) //TODO: handle this empty string problem in a cleaner way.
                    .creationDate(lineArray[2])
                    .build();
            //Here the code will wait before sending the LikesEventStream value created above
            sleep.wait(last_timestamp, lineArray[2]);
            last_timestamp = lineArray[2];

            //This sends the object to a topic in Kafka
            send(value, producer, likesTopicName);
        }

    } finally {
        bufferedReader.close();
    }
}

发送到主题的消息如下所示:

{"personId":721,"postId":270250,"creationDate":"2012-02-02T01:09:00.000Z","sentAt":1328141340000}
exdqitrt

exdqitrt1#

你可以为每一条延迟的消息分出一个线程。这样,当一个线程休眠时,您的主处理将继续:

public class Demo {

    public static void main(String[] args) {

        long timeToWait = 2000L;
        Runnable runner = new Runnable() {
            @Override
            public void run() {
                try { Thread.sleep(timeToWait); } catch (InterruptedException e) { }
                System.out.println("Writing delayed message here");
            }

        };

        Thread thread = new Thread(runner);
        thread.start();

        System.out.println("Processing continues after forking off message delay");

    }

}

相关问题