我正试着管理Kafka的偏移量。
使用偏移图创建直接流时遇到的问题如下:
val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)
在这里,
val messageHandler =
(mmd: MessageAndMetadata[String, String]) => mmd.message.length
以及
metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )
如果你能帮忙的话,我想我在声明风格上做错了什么。编译错误显示“createdirectstream的类型参数太多”
1条答案
按热度按时间dfty9e191#
有几件事我觉得你做错了。
你需要通过考试
Map[TopicAndPartition, Long]
,而当前您有Tuple2[TopicAndPartition, Long]
. 所以你需要:你说你的报税表是从哪来的
createDirectStream
是类型的元组(String, String)
,但你的messageHandler
值是一个Int
. 如果要返回包含键值对的元组,则需要:修复后,应编译: