我正在尝试读取flink中的原始日志文件以将其处理为kinesis。但是在flink scala中,如何读取原始文件而不是使用文本文件,因为文本文件创建的数据流[string]会在格式方面产生问题。我尝试过此方法,但它适用于readtext:
val inputStream = env.readTextFile(config.input.stream)
inputStream.print()
inputStream.map{str =>
val strByte = str.getBytes()
val thriftSerializer = new LazyBinaryThriftStructSerializer[CollectorPayload] {
override def codec: CollectorPayload.type = CollectorPayload
}
val collectorPayload = thriftSerializer.fromBytes(strByte)
collectorPayload
}.print()
在这里它没有转换成正确格式的数据,所以想读取二进制文件有可能吗?
暂无答案!
目前还没有任何答案,快来回答吧!