kstream在非键值上与globalktable连接

hwamh0ep  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(303)

我正在尝试用globalktable连接kstream,连接不完全在键上。

GlobalKTable<String, Employee> employeesDetails = builder.globalTable("EMPLOYEE_TOPIC",..);
KStream<String,String> empIdOverLoginUserId = builder.stream("LOG_TOPIC", ….);

我想加入empidoverloginuserid和employeesdeails,empidoverloginuserid的值超过employeesdeails的键。
有什么线索吗?

t40tm48m

t40tm48m1#

kstream globalktable join的第二个参数是 KeyValueMapper 要将kstream记录(键值)Map到要联接的globalktable的键,可以使用此选项在与globalktable联接时使用empidoverloginuserid的值作为键:

empIdOverLoginUserId.join(
                employeesDetails, 
                (userKey, userValue) -> userValue, 
                (userValue, employeesDetailValue) -> employeesDetailValue)

相关问题