我们对使用Kafka还很陌生,正在进行一个项目,将一些事件数据从一个平台传输到另一个平台。我们的消息模式是acord1125布局的修改;在这里,我大大简化了模式(完整版本将近700行):
{
"$schema": "http://json-schema.org/schema#",
"additionalProperties": false,
"definitions": {
"payload": {
"properties": {
"policies": {
"minItems": 1,
"policies": {
"$ref": "#/definitions/policy"
},
"title": "policies",
"type": "array"
}
},
"type": "object"
},
"policy": {
"properties": {
"effectiveDate": {
"minLength": 10,
"type": "string"
},
"number": {
"minLength": 20,
"type": "string"
},
"planName": {
"minLength": 15,
"type": "string"
}
},
"type": "object"
}
},
"description": "Status Update",
"properties": {
"eventHeader": {
"properties": {
"eventCorrelationId": {
"type": "string"
},
"eventDateTime": {
"type": "string"
},
"eventSource": {
"type": "string"
},
"eventSourceDescription": {
"type": "string"
},
"eventType": {
"type": "string"
}
},
"type": "object"
},
"statusTransmittal": {
"items": {
"$ref": "#/definitions/payload"
},
"type": "object"
}
},
"required": [
"eventHeader",
"statusTransmittal"
],
"type": "object"
}
我不清楚如何使用kafka connect(jdbc连接器)将数据转换成这种模式格式。我能够将数据放入一个相对平坦的模式结构中,即没有单独的有效负载和eventheader部分,但前提是我不给主题一个模式,让系统自己解决它。我一直找不到让系统确定某些值应该放在“payload”部分,而其他值应该放在“eventheader”部分的方法。
以这种格式获取数据的需求促使我使用sql查询,而不是表白名单。
我的问题是:
是否可以使用kafka jdbc连接器从一个数据库转到另一个主题,生成与所示类似的模式?
如果不是,建议采用什么方法?
我已经阅读了schema registry教程中的指南,但是它没有“点击”我的这个用例。
暂无答案!
目前还没有任何答案,快来回答吧!