如果flink的原始日志文件使用scala,如何读取文件?

ccgok5k5  于 2021-07-13  发布在  Java
关注(0)|答案(0)|浏览(182)

我正在尝试读取flink中的原始日志文件以将其处理为kinesis。但是在flink scala中,如何读取原始文件而不是使用文本文件,因为文本文件创建的数据流[string]会在格式方面产生问题。我尝试过此方法,但它适用于readtext:

val inputStream = env.readTextFile(config.input.stream)
inputStream.print()

inputStream.map{str =>
  val strByte = str.getBytes()
  val thriftSerializer = new LazyBinaryThriftStructSerializer[CollectorPayload] {
    override def codec: CollectorPayload.type = CollectorPayload
  }
  val collectorPayload = thriftSerializer.fromBytes(strByte)
  collectorPayload
}.print()

在这里它没有转换成正确格式的数据,所以想读取二进制文件有可能吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题