我是新的flink和做一些非常类似的下面的链接。
在flink 1.2中,无法看到Kafka流下沉时的消息,也无法看到打印消息
我还尝试添加jsondeserializationschema()作为没有键的kafka输入json消息的反序列化程序。
但是我发现jsondeserializationschema()不存在。
如果我做错了什么,请告诉我。
我是新的flink和做一些非常类似的下面的链接。
在flink 1.2中,无法看到Kafka流下沉时的消息,也无法看到打印消息
我还尝试添加jsondeserializationschema()作为没有键的kafka输入json消息的反序列化程序。
但是我发现jsondeserializationschema()不存在。
如果我做错了什么,请告诉我。
2条答案
按热度按时间9rygscc11#
为了解决从kafka读取非键json消息的问题,我使用了case类和json解析器。
下面的代码生成一个case类,并使用playapi解析json字段。
try方法允许您克服在解析数据时抛出的异常,并在其中一个字段中返回异常(如果需要的话),否则它可以只返回带有任何给定或默认字段的case类对象。
代码的示例输出为:
我不确定这是否是最好的方法,但它是为我工作到现在。
r1zhe5dt2#
JSONDeserializationSchema
在Flink1.8中被删除,之前被弃用过。建议的方法是编写一个实现
DeserializationSchema<T>
. 下面是一个例子,我从flink operationsPlayground复制的:对于一个Kafka制作人来说
KafkaSerializationSchema<T>
,你会在同一个项目中找到这样的例子。