在apache flink中解析来自kafka的数据

bkkx9g8r  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(331)

我刚刚开始使用apache flink(scala api),我的问题如下:我正在尝试将数据从kafka流式传输到apache flink,基于flink站点的一个示例:

val stream =
  env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))

一切正常,stream.print()语句在屏幕上显示以下内容:
2018-05-16上午10:22:44 | 1 | 11 |-71.16 | 40.27
我想使用case类来加载数据,我已经尝试使用

flatMap(p=>p.split("|"))

但它一次只拆分一个字符。
基本上,预期的结果是能够填充case类的5个字段,如下所示

field(0)=2018-05-16 10:22:44 AM
  field(1)=1
  field(2)=11
  field(3)=-71.16 
  field(4)=40.27

但现在它正在做:

field(0) = 2
   field(1) = 0
   field(3) = 1
   field(4) = 8

等。。。
任何建议都将不胜感激。
先谢谢你
弗兰克

vd8tlhqk

vd8tlhqk1#

问题是如何使用 String.split . 如果你用一个 String ,则该方法希望它是一个正则表达式。因此, p.split("\\|") 将是输入数据的正确正则表达式。或者,您也可以调用 split 指定分隔字符的变量 p.split('|') . 这两种解决方案都会给你带来想要的结果。

相关问题