使用DataStream API在Apache Flink中进行外键连接

t5zmwmid  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(123)

免责声明:我正在为我的组织使用各种场景进行Apache Flink POC。我处于学习阶段。
目前,我们正在使用Kafka Streams(沿着KTable)来连接多个流。然而,我们更关心Kafka Streams的延迟问题,这就是我们使用Apache Flink探索选项的地方。
其中一个场景涉及FK join。为了简单起见,我以Employee和Department为例,其中employee.deptId = department.depId和Department可以由多个雇员组成(一对多关系)。我使用状态流实现这一点,如下所示:

  1. //DataStreamSource<Long> streamSource = env.fromSequence(1, 10000000);
  2. SingleOutputStreamOperator<Employee> employeeSourceStreamOperator = env
  3. .fromSource(employeeSource, WatermarkStrategy.noWatermarks(), "Employee")
  4. .map(value -> objectMapper.readValue(value, Employee.class));
  5. SingleOutputStreamOperator<Department> departmentSingleOutputStreamOperator = env
  6. .fromSource(departmentSource, WatermarkStrategy.noWatermarks(), "Department")
  7. .map(value -> objectMapper.readValue(value, Department.class));
  8. employeeSourceStreamOperator
  9. .connect(departmentSingleOutputStreamOperator)
  10. .keyBy(Employee::getDeptId, Department::getDeptId)
  11. .map(new RichCoMapFunction<Employee, Department, Object>() {
  12. // ListState to store multiple Employee instances for each department
  13. private ListState<Employee> employeeListState;
  14. // ValueState to store Department information
  15. private ValueState<Department> departmentState;
  16. @Override
  17. public void open(Configuration parameters) throws Exception {
  18. // Initialize ListState for Employee
  19. ListStateDescriptor<Employee> employeeListStateDescriptor =
  20. new ListStateDescriptor<>("employeeListState", Employee.class);
  21. employeeListState = getRuntimeContext().getListState(employeeListStateDescriptor);
  22. // Initialize ValueState for Department
  23. ValueStateDescriptor<Department> departmentStateDescriptor =
  24. new ValueStateDescriptor<>("departmentState", Department.class);
  25. departmentState = getRuntimeContext().getState(departmentStateDescriptor);
  26. }
  27. @Override
  28. public Object map1(Employee employee) throws Exception {
  29. // Process Employee stream
  30. // Store Employee information in MapState based on departmentId
  31. employeeListState.add(employee);
  32. // Try to join with Department information
  33. Department department = departmentState.value();
  34. if (department != null) {
  35. // Join Employee and Department information
  36. return "Employee join:" + employee.getName() + " works in " + department.getDeptName();
  37. }
  38. return ""; // No immediate result, need to wait for Department information
  39. }
  40. @Override
  41. public Object map2(Department department) throws Exception {
  42. // Process Department stream
  43. // Store Department information in ValueState
  44. departmentState.update(department);
  45. // Try to join with Employee information
  46. Iterable<Employee> employees = employeeListState.get();
  47. if (employees != null) {
  48. // Join Employee and Department information for each employee in the list
  49. StringBuilder result = new StringBuilder();
  50. for (Employee employee : employees) {
  51. result.append("Department join:" + generateOutput(employee, department)).append("\n");
  52. }
  53. return result.toString();
  54. }
  55. return ""; // No immediate result, need to wait for Employee information
  56. }
  57. private String generateOutput(Employee employee, Department department) {
  58. return employee.getName() + " works in " + department.getDeptName();
  59. }
  60. })
  61. .sinkTo(new PrintSink<>());

字符串
这是预期的工作.然而,我无法弄清楚下面的场景.员工可以改变部门.在这种情况下,从员工到部门的关系可以很容易地改变,但从旧部门删除关系是具有挑战性的.有人可以请帮助与Flink功能,这将帮助我获得旧部门状态和删除员工?
让我知道,如果你需要更多的信息是上面失踪?
=使用表API根据大卫的建议更新了代码。

  1. SingleOutputStreamOperator<Employee> employeeSourceStreamOperator = env
  2. .fromSource(employeeSource, WatermarkStrategy.noWatermarks(), "Employee")
  3. .map(value -> objectMapper.readValue(value, Employee.class));
  4. Table employeeTable = tableEnvironment
  5. .fromDataStream(employeeSourceStreamOperator, $("deptId").as("empDeptId"),$("name").as("name"));
  6. SingleOutputStreamOperator<Department> departmentSingleOutputStreamOperator = env
  7. .fromSource(departmentSource, WatermarkStrategy.noWatermarks(), "Department")
  8. .map(value -> objectMapper.readValue(value, Department.class));
  9. Table departmentTable = tableEnvironment
  10. .fromDataStream(departmentSingleOutputStreamOperator, Schema.newBuilder().build());
  11. Table table = employeeTable.join(departmentTable)
  12. .where($("empDeptId").isEqual($("deptId")))
  13. .select($("name"), $("deptName"));
  14. DataStream<Row> rowDataStream =
  15. tableEnvironment.toChangelogStream(table);
  16. rowDataStream.sinkTo(new PrintSink<>());


在部门1上发布沿着员工6在其中,如下所示{“deptId”:1,“deptName”:“Department 1”} 5> +I[员工6,部门1]
通过将deptId从1更改为2发布了员工6 {“empId”:6,“name”:“员工6”,“deptId”:2}员工6更新为部门2 5> +I[员工6,部门2]
现在已发布部门1的更新,收到以下输出,其中员工6仍属于部门1 5> +I[员工6,部门1]
因此,员工6仍然与部门1有关系,尽管员工6将部门改为2。
我在flink-test上创建了git repo。EmployeeDepartmentTestDataGenerator.java示例testdata工具,它发布了关于Kafka主题的随机数据。
我错过了什么吗?

m2xkgtsf

m2xkgtsf1#

如果你使用Flink的Table API(或Flink SQL),这类应用程序的实现要简单得多。Table/SQL连接将自动处理你关心的更新。用DataStream API手工实现这一点是很多不必要的工作(它需要在Flink状态下具体化连接,以便生成更新)。

相关问题