kafka jdbc源连接器批量获取cron每周递增1小时

lstz6jyr  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(219)

我在使用批量模式的kafka合流jdbc连接器中遇到了一个奇怪的行为
如何复制:
创建一个源连接器并加载它,例如在下午6点
使用transforms.insertfield.timestamp.field或任何希望记录写入时间的系统创建接收器连接器
问题:
第一周的连接器将在下午6点写入行,1周后将在下午7点开始写入,2周后将在晚上8点开始写入,以此类推
jdbc连接器示例

{
            "name": "sap-bulk",
            "config": {
                "name": "sap-bulk",
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "tasks.max": "10",
                "topic.prefix": "bulk_",
                "table.whitelist": "----"
                "connection.url": "jdbc:sap://----/",
                "connection.user": "-----",
                "connection.password": "----------",
                "retention.ms":"604800000",
                "mode":"bulk",
                "table.types":"VIEW",
                "poll.interval.ms":"86400000",
                "validate.non.null":"false",
            }
    }

Flume示例

name=bgconnect-bulk
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=5

# topics=kcbq-quickstart

sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

# bufferSize=10000

# maxWriteSize=10000

tableWriteWait=900000

max.poll.records=1000000

########################################### Fill me in! ###########################################

project=--------
topics=-----
datasets=.*=mydata
keyfile=/opt/kafka/Gaccess.json
group.id=-----------

transforms=DropField,InsertField
transforms.DropField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.DropField.blacklist=fieldtime
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value

有人面临同样的问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题