使用springclouddataflow,我尝试使用kafka压缩主题从数据库复制数据。因此,我需要在将记录生成到主题(称为foo)时设置记录的键。
kafka-topics --zookeeper localhost:2181 --create --topic foo --replication-factor=1 --partitions 3 --config cleanup.policy=compact
stream create --name foo --deploy --definition "jdbc --spring.datasource.url=jdbc:postgresql://localhost:5432/chiodonia --spring.datasource.driver-class-name=org.postgresql.Driver --jdbc.max-rows-per-poll=10 --jdbc.query='select id, value from foo where seen is null' --jdbc.split=true --jdbc.update='update foo set seen=current_timestamp where id in (:id)' --trigger.fixed-delay=5 --trigger.time-unit=SECONDS | header-enricher --headers='recordkey=payload.id' > :foo --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['recordkey']"
有人能用spring云数据流将记录的密钥设置到kafka中吗?
1条答案
按热度按时间zfciruhq1#
要覆盖输出通道的绑定配置,必须将其设置为从输出通道向外的通道
header-enricher
处理器到foo
主题。那就是:
流创建——名称foo——部署——定义“jdbc……”header enricher--headers='recordkey=payload.id'--spring.cloud.stream.kafka.bindings.output.producer.messagekeyexpression=headers['recordkey']>:foo“