在以下错误中发现的和需要的有什么区别

34gzjxbg  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(282)

在研究斯卡拉·Kafka的时候 KeyValueMapper 实现我得到以下错误。我不知道到底有什么区别。谢谢你的帮助。
代码:
我创造了一个 KTable 从主题开始。

val creducer: Reducer[java.lang.Long] =
  (v1, v2) => if (v1 > v2) v1 else v2

val deduplicationWindow = TimeWindows
  .of(60000L * 10)
  .advanceBy(60000L)
  .until(60000L * 10)

val ktwindow: KTable[Windowed[String], java.lang.Long] =
  ipandTime
    .groupByKey(Serdes.String(), Serdes.Long())
    .reduce(creducer, deduplicationWindow, "ktwindow-query")

当我尝试创建键为的流时,使用selectkey方法时出错 Windowed[String] . 在java中类似的实现工作得很好。

val fStream = ktwindow
  .toStream()
  .selectKey(
    new KeyValueMapper[Windowed[String],
                       java.lang.Long,
                       KeyValue[String, java.lang.Long]] {
      override def apply(
          key: Windowed[String],
          value: java.lang.Long): KeyValue[String, java.lang.Long] = {
        new KeyValue(key.key(), value)
      }
    }
  )
[error]  found   : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]

[error]  required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]
yyhrrdl8

yyhrrdl81#

变量 ipandTime 找不到了,所以我把它换成了 ??? ,但这与实际问题无关。
如前所述,如果java使用站点通配符的类型推断失败,那么只需添加显式类型参数。这是为Kafka1.1.0编写的:

import org.apache.kafka.streams.kstream._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.KeyValue

object Q49594920 {

      val creducer: Reducer[java.lang.Long] =
      (v1, v2) => if (v1 > v2) v1 else v2

    val deduplicationWindow = TimeWindows
      .of(60000L * 10)
      .advanceBy(60000L)
      .until(60000L * 10)

    val ktwindow: KTable[Windowed[String], java.lang.Long] = ???
      // ipandTime // What's that? It's not defined anywhere!
      //   .groupByKey(Serdes.String(), Serdes.Long())
      //   .reduce(creducer, deduplicationWindow, "ktwindow-query")

    val fStream = ktwindow
      .toStream()
      .selectKey[KeyValue[String, java.lang.Long]](
        new KeyValueMapper[Windowed[String],
                           java.lang.Long,
                           KeyValue[String, java.lang.Long]] {
          override def apply(
              key: Windowed[String],
              value: java.lang.Long): KeyValue[String, java.lang.Long] = {
            new KeyValue(key.key(), value)
          }
        }
      )
}

这个 selectKey 方法需要泛型类型参数 KR ,所以我只给出了具体的类型 KeyValue[String, java.lang.Long] 对它,然后它就工作了。

相关问题