免责声明:我正在为我的组织使用各种场景进行Apache Flink POC。我处于学习阶段。
目前,我们正在使用Kafka Streams(沿着KTable)来连接多个流。然而,我们更关心Kafka Streams的延迟问题,这就是我们使用Apache Flink探索选项的地方。
其中一个场景涉及FK join。为了简单起见,我以Employee和Department为例,其中employee.deptId = department.depId和Department可以由多个雇员组成(一对多关系)。我使用状态流实现这一点,如下所示:
//DataStreamSource<Long> streamSource = env.fromSequence(1, 10000000);
SingleOutputStreamOperator<Employee> employeeSourceStreamOperator = env
.fromSource(employeeSource, WatermarkStrategy.noWatermarks(), "Employee")
.map(value -> objectMapper.readValue(value, Employee.class));
SingleOutputStreamOperator<Department> departmentSingleOutputStreamOperator = env
.fromSource(departmentSource, WatermarkStrategy.noWatermarks(), "Department")
.map(value -> objectMapper.readValue(value, Department.class));
employeeSourceStreamOperator
.connect(departmentSingleOutputStreamOperator)
.keyBy(Employee::getDeptId, Department::getDeptId)
.map(new RichCoMapFunction<Employee, Department, Object>() {
// ListState to store multiple Employee instances for each department
private ListState<Employee> employeeListState;
// ValueState to store Department information
private ValueState<Department> departmentState;
@Override
public void open(Configuration parameters) throws Exception {
// Initialize ListState for Employee
ListStateDescriptor<Employee> employeeListStateDescriptor =
new ListStateDescriptor<>("employeeListState", Employee.class);
employeeListState = getRuntimeContext().getListState(employeeListStateDescriptor);
// Initialize ValueState for Department
ValueStateDescriptor<Department> departmentStateDescriptor =
new ValueStateDescriptor<>("departmentState", Department.class);
departmentState = getRuntimeContext().getState(departmentStateDescriptor);
}
@Override
public Object map1(Employee employee) throws Exception {
// Process Employee stream
// Store Employee information in MapState based on departmentId
employeeListState.add(employee);
// Try to join with Department information
Department department = departmentState.value();
if (department != null) {
// Join Employee and Department information
return "Employee join:" + employee.getName() + " works in " + department.getDeptName();
}
return ""; // No immediate result, need to wait for Department information
}
@Override
public Object map2(Department department) throws Exception {
// Process Department stream
// Store Department information in ValueState
departmentState.update(department);
// Try to join with Employee information
Iterable<Employee> employees = employeeListState.get();
if (employees != null) {
// Join Employee and Department information for each employee in the list
StringBuilder result = new StringBuilder();
for (Employee employee : employees) {
result.append("Department join:" + generateOutput(employee, department)).append("\n");
}
return result.toString();
}
return ""; // No immediate result, need to wait for Employee information
}
private String generateOutput(Employee employee, Department department) {
return employee.getName() + " works in " + department.getDeptName();
}
})
.sinkTo(new PrintSink<>());
字符串
这是预期的工作.然而,我无法弄清楚下面的场景.员工可以改变部门.在这种情况下,从员工到部门的关系可以很容易地改变,但从旧部门删除关系是具有挑战性的.有人可以请帮助与Flink功能,这将帮助我获得旧部门状态和删除员工?
让我知道,如果你需要更多的信息是上面失踪?
=使用表API根据大卫的建议更新了代码。
SingleOutputStreamOperator<Employee> employeeSourceStreamOperator = env
.fromSource(employeeSource, WatermarkStrategy.noWatermarks(), "Employee")
.map(value -> objectMapper.readValue(value, Employee.class));
Table employeeTable = tableEnvironment
.fromDataStream(employeeSourceStreamOperator, $("deptId").as("empDeptId"),$("name").as("name"));
SingleOutputStreamOperator<Department> departmentSingleOutputStreamOperator = env
.fromSource(departmentSource, WatermarkStrategy.noWatermarks(), "Department")
.map(value -> objectMapper.readValue(value, Department.class));
Table departmentTable = tableEnvironment
.fromDataStream(departmentSingleOutputStreamOperator, Schema.newBuilder().build());
Table table = employeeTable.join(departmentTable)
.where($("empDeptId").isEqual($("deptId")))
.select($("name"), $("deptName"));
DataStream<Row> rowDataStream =
tableEnvironment.toChangelogStream(table);
rowDataStream.sinkTo(new PrintSink<>());
型
在部门1上发布沿着员工6在其中,如下所示{“deptId”:1,“deptName”:“Department 1”} 5> +I[员工6,部门1]
通过将deptId从1更改为2发布了员工6 {“empId”:6,“name”:“员工6”,“deptId”:2}员工6更新为部门2 5> +I[员工6,部门2]
现在已发布部门1的更新,收到以下输出,其中员工6仍属于部门1 5> +I[员工6,部门1]
因此,员工6仍然与部门1有关系,尽管员工6将部门改为2。
我在flink-test上创建了git repo。EmployeeDepartmentTestDataGenerator.java示例testdata工具,它发布了关于Kafka主题的随机数据。
我错过了什么吗?
1条答案
按热度按时间m2xkgtsf1#
如果你使用Flink的Table API(或Flink SQL),这类应用程序的实现要简单得多。Table/SQL连接将自动处理你关心的更新。用DataStream API手工实现这一点是很多不必要的工作(它需要在Flink状态下具体化连接,以便生成更新)。