我正在用Java从Kafka中读取数据,以便在Apache Flink中执行一些处理并接收结果。
我有一个Kafka主题topic_a,其中包含一些数据,如{name:"abc",年龄:20}和一些数据,如{pin:111,编号:999999,地址:"某处"}
当我使用KafkaSource从kafka读取数据时,我将记录反序列化到一个POJO中,该POJO具有字段String name、int age以及它们各自的getter和setter函数和构造函数。
当我运行flink代码时,deserliazer对{name:"abc",年龄:20年
KafkaSource<AllDataPOJO> kafkaAllAlertsSource = KafkaSource.<AllAlertsPOJO>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(Arrays.asList("topic_a"))
.setProperties(properties)
.setGroupId(allEventsGroupID)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new AllDataDeserializationSchema())
.build();
AllDataPOJO
private String name;
private int age;
- *{名称:"abc",年龄:20},但一旦{pin:111,编号:999999,地址:"某个地方"}**,它开始失败。
- 2个问题:**
1.有没有什么方法可以让我阅读这些不同格式的信息并执行flink操作。根据信息的类型,我希望将其路由到不同的Kafka主题。
1.当我得到**{name:"abc",年龄:20},它应该转到主题user_basic和{pin:111,编号:999999,地址:"某个位置"}应转到主题用户详细信息**
我怎样才能只用一个flink java代码就实现上述功能呢?
2条答案
按热度按时间slhcrj9b1#
您可能对将反序列化架构指定为以下形式感兴趣:
然后,使用Map和筛选该源,验证存在哪些字段:
或者将对象投射到Map中的现有POJO。
mwkjh3gx2#
如果有其他POJO类包含其他字段,则不能使用
<AllDataPOJO>
。或者,您需要添加所有POJO类型的所有字段,并在数据中不存在这些字段时使其可为空。但这可能容易出错,例如,name和pin可能存在于同一条记录中,但不应该存在。
否则,正如另一个答案所说,使用更通用的String/JSON反序列化器,然后可以使用filter/map操作将数据转换为更具体的类型,具体取决于可用的字段