我刚刚开始使用apache flink(scala api),我的问题如下:我正在尝试将数据从kafka流式传输到apache flink,基于flink站点的一个示例:
val stream =
env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))
一切正常,stream.print()语句在屏幕上显示以下内容:
2018-05-16上午10:22:44 | 1 | 11 |-71.16 | 40.27
我想使用case类来加载数据,我已经尝试使用
flatMap(p=>p.split("|"))
但它一次只拆分一个字符。
基本上,预期的结果是能够填充case类的5个字段,如下所示
field(0)=2018-05-16 10:22:44 AM
field(1)=1
field(2)=11
field(3)=-71.16
field(4)=40.27
但现在它正在做:
field(0) = 2
field(1) = 0
field(3) = 1
field(4) = 8
等。。。
任何建议都将不胜感激。
先谢谢你
弗兰克
1条答案
按热度按时间vd8tlhqk1#
问题是如何使用
String.split
. 如果你用一个String
,则该方法希望它是一个正则表达式。因此,p.split("\\|")
将是输入数据的正确正则表达式。或者,您也可以调用split
指定分隔字符的变量p.split('|')
. 这两种解决方案都会给你带来想要的结果。