kafkautils api |偏移管理| spark流

pgvzfuti  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(297)

我正试着管理Kafka的偏移量。
使用偏移图创建直接流时遇到的问题如下:

val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)

在这里,

val messageHandler =
      (mmd: MessageAndMetadata[String, String]) => mmd.message.length

以及

metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )

如果你能帮忙的话,我想我在声明风格上做错了什么。编译错误显示“createdirectstream的类型参数太多”

dfty9e19

dfty9e191#

有几件事我觉得你做错了。
你需要通过考试 Map[TopicAndPartition, Long] ,而当前您有 Tuple2[TopicAndPartition, Long] . 所以你需要:

val fromOffsets: Map[TopicAndPartition, Long] = 
    Map(TopicAndPartition(metrics_rs.getString(1), 
                          metrics_rs.getInt(2)) -> metrics_rs.getLong(3))

你说你的报税表是从哪来的 createDirectStream 是类型的元组 (String, String) ,但你的 messageHandler 值是一个 Int . 如果要返回包含键值对的元组,则需要:

val messageHandler: MessageAndMetadata[String, String] => (String, String) =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())

修复后,应编译:

val stream = KafkaUtils
              .createDirectStream[String, String,
                      StringDecoder, StringDecoder,
                      (String, String)] (ssc, 
                                         kafkaParams, 
                                         fromOffsets, 
                                         messageHandler)

相关问题