filter-flink元组

myss37ts  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(384)

我正在用flink为scala中的流处理编写一个程序。我有一个数据流,我首先Map到包含json4sjvalues的元组。现在我想根据这些值过滤这些元组。我认为这会很简单,但我找不到任何好的例子如何过滤flink元组的列。有人知道怎么做吗?谢谢

olhwl3o2

olhwl3o21#

这个问题对我来说似乎有点太不明确了,但也许,这不管用吗?

// stream contains stuff like these in a flink tuple 
//(custom deserializer of array to tuple2???)
val jsonExample = """["foo", "bar"]"""

val stream: DataStream[Tuple2[JString, JString]] = ???
val filteredStream = stream.filter(x => x.getField(0).extract[String] == "foo")

如果你在写scala,最好不要使用flink元组。选择case类或者至少scala元组?

ergxz8rk

ergxz8rk2#

不用Map到元组,只需Map到case类并过滤掉不需要的内容:

// StreamingJob.scala

...

val filteredEvents = content
      .map(x => Event.toCaseClass(x))
      .filter(x => x.value == true)

...

// Event.scala

case class Event(
                  id: String,
                  value: Int,
                )
object Event {
  implicit val formats = DefaultFormats

  def toCaseClass(str: String) =
    parse(str).extract[Event]
}

相关问题