如何将javaapi kstream转换为scalaapi kstream?

8fq7wneg  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(410)

我们的库有使用典型kafka流的数据处理器(scala代码):

import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream

搬去的原因 kafka-stream-scala 每当我使用groupby、selectbykey等函数时,都会出现奇怪的返回类型。

(builder: StreamsBuilder, stream: KStream[String, Event]) =>
 val wgCreatedStream = stream.groupBy((_,v) => 
  v.payload match {
   case x:WorkgroupCreated => x.id //this is a String
  })

以这段代码为例。 StreamsBuilder 结果到 KGroupedStream[Nothing,Event] 当我使用这些导入时:

import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}

返回类型最终更改为 KGroupedStream[String,Event]**我真正希望的是:**使用 kafka-stream-scala 不重构我们的数据处理器
如果是的话,那就太棒了,特别是如果有例子的话!
如果没有。。。这将是一段痛苦的旅程(不过还是要谢谢你)

sd2nnvve

sd2nnvve1#

好吧,在和我的同事聊天之后,我马上找到了答案-___-
org.apache.kafka.streams.scala.ImplicitConversions ,
有一个 wrapKStream() 转化为 streams.kstream.KStreamstreams.scala.kstream.KStream 另一方面,打电话给警察就行了 inner 方法。ktable也是如此。

相关问题