有两个主题,来源主题。a,来源主题。b。源\主题.a与源\主题.b有依赖关系(例如,需要先接收源\主题.b)。为了记录sink进程,需要先从source\u topic.b接收数据,然后从source\u topic.a接收数据。有没有办法在源/接收器配置中设置主题/表的顺序?
下面是使用的配置,有多个表和主题。时间戳用于每次轮询表时更新表的模式。和timestamp.initial将值设置为特定的时间戳。
源配置
name=jdbc-mssql-prod-5
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:sqlserver:
connection.user=
connection.password=
topic.prefix= source_topic.
mode=timestamp
table.whitelist=A,B,C
timestamp.column.name=ModifiedDateTime
connection.backoff.ms=60000
connection.attempts=300
validate.non.null= false
# enter timestamp in milliseconds
timestamp.initial= 1604977200000
接收器配置
name=mysql-sink-prod-5
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics= sink_topic_a, sink_topic_b
connection.url=jdbc:mysql:
connection.user=
connection.password=
insert.mode=upsert
delete.enabled=true
pk.mode=record_key
errors.log.enable= true
errors.log.include.messages=true
1条答案
按热度按时间hm2xizp91#
不,jdbc接收器连接器不支持这种逻辑。
你把批量思维应用到一个流的世界:)想想:Kafka怎么会知道它已经“完成”了下沉
topic_a
? 流是无界的,因此您最后不得不说“如果在给定的时间窗口内没有收到任何消息,那么假设您已经完成了从这个主题中提取数据并转移到下一个主题”。您最好在kafka本身中进行必要的数据连接(例如,使用kafka streams或ksqldb),然后将结果写回新的kafka主题,然后将其放入数据库。