我正在逐行读取3个csv文件,并将它们发送到消息队列(apachekafka)。通过增加时间戳值对数据行进行排序。我通过查看最后一个时间戳和新的时间戳来模拟流,并根据这两个时间戳的差异使线程休眠。我这样做是为了模拟消息的产生。
现在我想让这些消息中的一些被随机延迟一段时间,并准备了一个函数,该函数将线程置于睡眠状态一段随机时间,并随机选择何时执行此操作。
当我这样做的时候,我把整个csv文件的读取延迟到所有必须产生的后续消息。
可能我在这方面缺乏经验,但我不知道如何随机地将我正在生成的一条消息置于睡眠状态,而不延迟所有必须到来的消息?
//Read the CSV file line by line, serialize into object and put to sleep fo
public void readLikesEventStreamCSV(
final BufferedReader bufferedReader, StreamproducerApplication.StreamProducer producer) throws IOException {
String last_timestamp = "";
StreamWaitSimulation sleep = new StreamWaitSimulation();
try {
String line;
line = bufferedReader.readLine(); //read the first line. we do nothing with it.
while ((line = bufferedReader.readLine()) != null) {
final String[] lineArray = pattern.split(line);
LikesEventStream value = new LikesEventStream
.Builder()
.personId(Integer.parseInt(lineArray[0]))
.postId(Integer.parseInt(lineArray[1].equals("") ? "-1":lineArray[1] )) //TODO: handle this empty string problem in a cleaner way.
.creationDate(lineArray[2])
.build();
//Here the code will wait before sending the LikesEventStream value created above
sleep.wait(last_timestamp, lineArray[2]);
last_timestamp = lineArray[2];
//This sends the object to a topic in Kafka
send(value, producer, likesTopicName);
}
} finally {
bufferedReader.close();
}
}
发送到主题的消息如下所示:
{"personId":721,"postId":270250,"creationDate":"2012-02-02T01:09:00.000Z","sentAt":1328141340000}
1条答案
按热度按时间exdqitrt1#
你可以为每一条延迟的消息分出一个线程。这样,当一个线程休眠时,您的主处理将继续: