如何从select*flink中选择字段

xoshrz7s  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(559)

我正在尝试加入2个选择。
我要用代码做一个查询,看起来像这个查询

select *
from  Data
where numPers > 10 && Object = P1

还有这个

select *
from  Data
where numPers < 20 && Object == P1

我只需要数据中的时间戳而不需要重复
我使用的程序代码如下所示

object Prog {

  def main(args: Array[String]) : Unit = {
    org.apache.log4j.BasicConfigurator.configure()

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = CsvTableSource
      .builder
      .path("src/main/resources/data.stream")
      .field("numPers", Types.INT)
      .field("Object", Types.STRING)
      .field("TIMESTAMP", Types.STRING)
      .fieldDelimiter(",")
      .ignoreFirstLine
      .ignoreParseErrors
      .commentPrefix("%")
      .build()

    tableEnv.registerTableSource("Data", csvTableSource)

    val table = tableEnv.scan("Data") //this works
      .filter("numPers > 10")
      .select("*")

    val ds = tableEnv.toAppendStream(table, classOf[Row])

    ds.print()
    env.execute()
  }
}

但是如何将第二个查询添加到第一个查询?

rqenqsqc

rqenqsqc1#

如果我正确理解了您的需求,您不需要加入,只需要一个 BETWEEN 谓语:

val query = "SELECT * FROM Data WHERE numPers BETWEEN 10 AND 20 AND Object = P1"
val table = tableEnv.sqlQuery(query)

相关问题