Kafka 在Java中使用多个数据结构的Flink

rwqw0loc  于 2023-02-03  发布在  Apache
关注(0)|答案(2)|浏览(138)

我正在用Java从Kafka中读取数据,以便在Apache Flink中执行一些处理并接收结果。
我有一个Kafka主题topic_a,其中包含一些数据,如{name:"abc",年龄:20}和一些数据,如{pin:111,编号:999999,地址:"某处"}
当我使用KafkaSource从kafka读取数据时,我将记录反序列化到一个POJO中,该POJO具有字段String name、int age以及它们各自的getter和setter函数和构造函数。
当我运行flink代码时,deserliazer对{name:"abc",年龄:20年

KafkaSource<AllDataPOJO> kafkaAllAlertsSource = KafkaSource.<AllAlertsPOJO>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(Arrays.asList("topic_a"))
                .setProperties(properties)
                .setGroupId(allEventsGroupID)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new AllDataDeserializationSchema())
                .build();
AllDataPOJO
private String name;
private int age;
  • *{名称:"abc",年龄:20},但一旦{pin:111,编号:999999,地址:"某个地方"}**,它开始失败。
    • 2个问题:**

1.有没有什么方法可以让我阅读这些不同格式的信息并执行flink操作。根据信息的类型,我希望将其路由到不同的Kafka主题。
1.当我得到**{name:"abc",年龄:20},它应该转到主题user_basic{pin:111,编号:999999,地址:"某个位置"}应转到主题用户详细信息**
我怎样才能只用一个flink java代码就实现上述功能呢?

slhcrj9b

slhcrj9b1#

您可能对将反序列化架构指定为以下形式感兴趣:

.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(false)))

然后,使用Map和筛选该源,验证存在哪些字段:

Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>)

Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>)

或者将对象投射到Map中的现有POJO。

mwkjh3gx

mwkjh3gx2#

如果有其他POJO类包含其他字段,则不能使用<AllDataPOJO>
或者,您需要添加所有POJO类型的所有字段,并在数据中不存在这些字段时使其可为空。但这可能容易出错,例如,name和pin可能存在于同一条记录中,但不应该存在。
否则,正如另一个答案所说,使用更通用的String/JSON反序列化器,然后可以使用filter/map操作将数据转换为更具体的类型,具体取决于可用的字段

相关问题