kafka流发送单个消息

wd2eg0qa  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(243)

我正在创建一个kafka流并发送一条消息,在该消息上进行某些转换,并将输出生成到输出kafka队列。只发送一条消息时,我看不到任何输出,但当我开始发送多条消息时,(2,3,。。。以此类推)我可以看到一些输出。有没有人能解释一下我是否遗漏了一些东西来确保单个消息的输出?
我的设想是这样的:
我有一个编写kafka输入主题的测试用例场景。我的服务读取输入主题,将结果写入输出主题。testcase再次从输出主题读入它并显示最终结果//制作人

producer.send(record, new Callback {
            override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
              if (e != null) {
                System.out.print("in system")
                print("in print")
                ex += "No confirmation received"
                logger.warn(s"No confirmation received for $message", e)

              }
              if (recordMetadata != null) {
                ex += "message sent" + s"message $message sent: checksum=${recordMetadata.checksum}, " +
                  s"offset=${recordMetadata.offset}, partition=${recordMetadata.partition}"
                logger.info(s"message $message sent: checksum=${recordMetadata.checksum}, " +
                  s"offset=${recordMetadata.offset}, partition=${recordMetadata.partition}")

              }

            }

          }).get()
//Consumer
    runner = new Thread() {
          override def run(): Unit = {
            while (inProgress) {
              val newMessages = consumer.poll(1000)
              val it = newMessages.iterator()
              System.out.print("hasNext is"+ it.hasNext.toString)

              while(it.hasNext) {
                val nxt = it.next()
                System.out.print("one more"+ nxt.offset() + " "+nxt.toString)
                val record = nxt
                logger.info(s"Received record: $record")
                store.append(record)
              }

            }
          }
        }

消息正确地转到了输入主题,我可以通过一些日志消息来验证这一点。但是,在服务上,我尝试使用stream.print()创建流

Try(
      KafkaStreaming.createKafkaStream[String, String, StringDeserializer, StringDeserializer]
        (ssc, config, topics)
    ) match {
      case Failure(_: SparkException) if nTries > 1 =>
        Thread.sleep(500)
        createDStream(ssc, config, topics, nTries - 1)
      case Failure(exn: Throwable) => throw exn
      case Success(stream) => {
        stream.foreachRDD(rdd => {
          if (rdd.isEmpty() == true)
          {print("The rdd recieved is empty")}
          else{
            rdd.foreach( p => print("The rdd recieved is non empty" + p._1 + p._2))
          }
        })
        stream.print()
        stream
      }
    }
  }

我只能从第三张唱片开始看到打印结果。在只发送一条记录时总是看到“接收的rdd为空”。
样本输出:

The rdd recieved is empty-------------------------------------------
Time: 1513579502000 ms
-------------------------------------------

-------------------------------------------
Time: 1513579504000 ms
-------------------------------------------
(null,{'id':2,'text':'spark fvt'})

+---+---------+-----+
|id |text     |label|
+---+---------+-----+
|2  |spark fvt|0.0  |
+---+---------+-----+

+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
| id|     text|label|       words|            features|       rawPrediction|         probability|prediction|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
|  2|spark fvt|  0.0|[spark, fvt]|(1000,[105,983],[...|[0.16293291377568...|[0.54064335448518...|       0.0|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+

The value of sendToSink [id: bigint, text: string ... 6 more fields]The rdd recieved is empty-------------------------------------------
Time: 1513579506000 ms
-------------------------------------------

The rdd recieved is empty-------------------------------------------
Time: 1513579508000 ms
-------------------------------------------

The rdd recieved is empty-------------------------------------------
Time: 1513579510000 ms
-------------------------------------------

The rdd recieved is empty-------------------------------------------
Time: 1513579512000 ms
-------------------------------------------

The rdd recieved is empty-------------------------------------------
Time: 1513579514000 ms
-------------------------------------------

-------------------------------------------
Time: 1513579516000 ms
-------------------------------------------
(null,{'id':3,'text':'spark fvt'})

+---+---------+-----+
|id |text     |label|
+---+---------+-----+
|3  |spark fvt|0.0  |
+---+---------+-----+

+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
| id|     text|label|       words|            features|       rawPrediction|         probability|prediction|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+
|  3|spark fvt|  0.0|[spark, fvt]|(1000,[105,983],[...|[0.16293291377568...|[0.54064335448518...|       0.0|
+---+---------+-----+------------+--------------------+--------------------+--------------------+----------+

ps:如果auto.commit.reset设置为“earliest”,我可以检索第一条记录。但如果设置为“最新”,第一条记录就会丢失。有什么想法吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题