我正在创建一个kafka流并发送一条消息,在该消息上进行某些转换,并将输出生成到输出kafka队列。只发送一条消息时,我看不到任何输出,但当我开始发送多条消息时,(2,3,。。。以此类推)我可以看到一些输出。有没有人能解释一下我是否遗漏了一些东西来确保单个消息的输出?
我的设想是这样的:
我有一个编写kafka输入主题的测试用例场景。我的服务读取输入主题,将结果写入输出主题。testcase再次从输出主题读入它并显示最终结果//制作人
producer.send(record, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
System.out.print("in system")
print("in print")
ex += "No confirmation received"
logger.warn(s"No confirmation received for $message", e)
}
if (recordMetadata != null) {
ex += "message sent" + s"message $message sent: checksum=${recordMetadata.checksum}, " +
s"offset=${recordMetadata.offset}, partition=${recordMetadata.partition}"
logger.info(s"message $message sent: checksum=${recordMetadata.checksum}, " +
s"offset=${recordMetadata.offset}, partition=${recordMetadata.partition}")
}
}
}).get()
//Consumer
runner = new Thread() {
override def run(): Unit = {
while (inProgress) {
val newMessages = consumer.poll(1000)
val it = newMessages.iterator()
System.out.print("hasNext is"+ it.hasNext.toString)
while(it.hasNext) {
val nxt = it.next()
System.out.print("one more"+ nxt.offset() + " "+nxt.toString)
val record = nxt
logger.info(s"Received record: $record")
store.append(record)
}
}
}
}
消息正确地转到了输入主题,我可以通过一些日志消息来验证这一点。但是,在服务上,我尝试使用stream.print()创建流
Try(
KafkaStreaming.createKafkaStream[String, String, StringDeserializer, StringDeserializer]
(ssc, config, topics)
) match {
case Failure(_: SparkException) if nTries > 1 =>
Thread.sleep(500)
createDStream(ssc, config, topics, nTries - 1)
case Failure(exn: Throwable) => throw exn
case Success(stream) => {
stream.foreachRDD(rdd => {
if (rdd.isEmpty() == true)
{print("The rdd recieved is empty")}
else{
rdd.foreach( p => print("The rdd recieved is non empty" + p._1 + p._2))
}
})
stream.print()
stream
}
}
}
我只能从第三张唱片开始看到打印结果。在只发送一条记录时总是看到“接收的rdd为空”。
样本输出:
The rdd recieved is empty-------------------------------------------
Time: 1513579502000 ms
-------------------------------------------
-------------------------------------------
Time: 1513579504000 ms
-------------------------------------------
(null,{'id':2,'text':'spark fvt'})
+---+---------+-----+
|id |text |label|
+---+---------+-----+
|2 |spark fvt|0.0 |
+---+---------+-----+
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
| id| text|label| words| features| rawPrediction| probability|prediction|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
| 2|spark fvt| 0.0|[spark, fvt]|(1000,[105,983],[...|[0.16293291377568...|[0.54064335448518...| 0.0|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
The value of sendToSink [id: bigint, text: string ... 6 more fields]The rdd recieved is empty-------------------------------------------
Time: 1513579506000 ms
-------------------------------------------
The rdd recieved is empty-------------------------------------------
Time: 1513579508000 ms
-------------------------------------------
The rdd recieved is empty-------------------------------------------
Time: 1513579510000 ms
-------------------------------------------
The rdd recieved is empty-------------------------------------------
Time: 1513579512000 ms
-------------------------------------------
The rdd recieved is empty-------------------------------------------
Time: 1513579514000 ms
-------------------------------------------
-------------------------------------------
Time: 1513579516000 ms
-------------------------------------------
(null,{'id':3,'text':'spark fvt'})
+---+---------+-----+
|id |text |label|
+---+---------+-----+
|3 |spark fvt|0.0 |
+---+---------+-----+
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
| id| text|label| words| features| rawPrediction| probability|prediction|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
| 3|spark fvt| 0.0|[spark, fvt]|(1000,[105,983],[...|[0.16293291377568...|[0.54064335448518...| 0.0|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
ps:如果auto.commit.reset设置为“earliest”,我可以检索第一条记录。但如果设置为“最新”,第一条记录就会丢失。有什么想法吗?
暂无答案!
目前还没有任何答案,快来回答吧!