假设我有2个kafka流(kafka流scala库,版本2.2.0):
val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")
以及他们的加入:
val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) => MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))
ksql中where子句的等价物是什么(请参阅流api的最新订单(流)?使用stream3.filter是个好主意吗?这种方法是否与ksql创建的流具有相同的效率?
1条答案
按热度按时间kiayqfof1#
ksql中where子句的等价物是什么(请参阅流api的最新订单(流)?
它是:
KStream#filter()
,返回一个经过筛选的KStream
KTable#filter()
,返回一个经过筛选的KTable
https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#stateless-变换使用stream3.filter是个好主意吗?
对。
这种方法是否与ksql创建的流具有相同的效率?
对。