我在使用批量模式的kafka合流jdbc连接器中遇到了一个奇怪的行为
如何复制:
创建一个源连接器并加载它,例如在下午6点
使用transforms.insertfield.timestamp.field或任何希望记录写入时间的系统创建接收器连接器
问题:
第一周的连接器将在下午6点写入行,1周后将在下午7点开始写入,2周后将在晚上8点开始写入,以此类推
jdbc连接器示例
{
"name": "sap-bulk",
"config": {
"name": "sap-bulk",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "10",
"topic.prefix": "bulk_",
"table.whitelist": "----"
"connection.url": "jdbc:sap://----/",
"connection.user": "-----",
"connection.password": "----------",
"retention.ms":"604800000",
"mode":"bulk",
"table.types":"VIEW",
"poll.interval.ms":"86400000",
"validate.non.null":"false",
}
}
Flume示例
name=bgconnect-bulk
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=5
# topics=kcbq-quickstart
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
# bufferSize=10000
# maxWriteSize=10000
tableWriteWait=900000
max.poll.records=1000000
########################################### Fill me in! ###########################################
project=--------
topics=-----
datasets=.*=mydata
keyfile=/opt/kafka/Gaccess.json
group.id=-----------
transforms=DropField,InsertField
transforms.DropField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.DropField.blacklist=fieldtime
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
有人面临同样的问题吗?
暂无答案!
目前还没有任何答案,快来回答吧!