我们的库有使用典型kafka流的数据处理器(scala代码):
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
搬去的原因 kafka-stream-scala
每当我使用groupby、selectbykey等函数时,都会出现奇怪的返回类型。
(builder: StreamsBuilder, stream: KStream[String, Event]) =>
val wgCreatedStream = stream.groupBy((_,v) =>
v.payload match {
case x:WorkgroupCreated => x.id //this is a String
})
以这段代码为例。 StreamsBuilder
结果到 KGroupedStream[Nothing,Event]
当我使用这些导入时:
import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}
返回类型最终更改为 KGroupedStream[String,Event]
**我真正希望的是:**使用 kafka-stream-scala
不重构我们的数据处理器
如果是的话,那就太棒了,特别是如果有例子的话!
如果没有。。。这将是一段痛苦的旅程(不过还是要谢谢你)
1条答案
按热度按时间sd2nnvve1#
好吧,在和我的同事聊天之后,我马上找到了答案-___-
从
org.apache.kafka.streams.scala.ImplicitConversions
,有一个
wrapKStream()
转化为streams.kstream.KStream
至streams.scala.kstream.KStream
另一方面,打电话给警察就行了inner
方法。ktable也是如此。