使用debezium mongodb连接器,我成功地将我的收藏推到了Kafka,我面临的唯一问题是这个领域 date
在我的一个收藏中,格式为2019-05-14t23:25:34.703+00:00,没有被推到相同格式的主题,而是我得到了类似于1560708585175的东西。
这是我的debezium连接器命令 connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka/connect-mongodb-source.properties
这是我的mongodb集合示例。
{"_id":"5cdb4e6ed767ba70593e2aa8","sender":"5cdb43db4505956efc70ba03","receiver":"5cdb43db4505956efc70ba03","receiverWalletId":"5cdb43db4505956efc70ba04","status":"succes","type":"topup","amount":200000,"totalFee":0,"createdAt":"2019-05-14T23:25:34.703Z","updatedAt":"2019-05-14T23:25:35.132Z","__v":0,"details":"none."}
这是我的Kafka主题的例子。
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"sender"},{"type":"string","optional":true,"field":"receiver"},{"type":"string","optional":true,"field":"receiverWalletId"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"field":"type"},{"type":"int32","optional":true,"field":"amount"},{"type":"int32","optional":true,"field":"totalFee"},{"type":"int64","optional":true,"field":"createdAt"},{"type":"int64","optional":true,"field":"updatedAt"},{"type":"int32","optional":true,"field":"__v"},{"type":"string","optional":true,"field":"from"},{"type":"string","optional":true,"field":"orderId"},{"type":"string","optional":true,"field":"id"}],"optional":false,"name":"mongo_conn.digi.transactions"},"payload":{"sender":"5cef970ca2e9c273c655483","receiver":"5cef970ca2e9c27355c483","receiverWalletId":"5cef970ca2e9c27556c484","status":"pending","type":"topup","amount":6000,"totalFee":0,"createdAt":1560708024322,"updatedAt":1560708024753,"__v":0,"from":"smt","orderId":"d7a97581-9d18-79cd-8b09-16e400a43714","id":"5d0683b8be4af834abe3cf58"}}
这是我的connect-mongodb-source.properties
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=repracli/**.**.**.***27017
mongodb.name=mongo_conn
initial.sync.max.threads=1
tasks.max=1
transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongo$
transforms.unwrap.operation.header=true
3条答案
按热度按时间hsvhsicv1#
debezium将数据以oplog中存储的格式进行流处理。日期看起来像unix时间戳(以毫秒为单位)。
你可以写一个smt(https://cwiki.apache.org/confluence/display/kafka/kip-66%3a+single+message+transforms+for+kafka+connect)这将操作消息并将请求的字段转换为首选的字符串表示形式。
如果你调查
org.bson.BsonDateTime
你会发现这是真的long
价值观。qncylg1j2#
对于一些转换,您将需要以下内容:
nkoocmlb3#
解决了的