这是关于Kafka连接csv的spooldir连接器。我想知道是否有办法避免硬编码模式,让连接器动态创建模式?我有很多csv文件要处理,比如说每天几百gb,有时是几TB的csv文件。有时一些csv文件会有新的列,有些会被删除。
我能够成功地读csv和写ElasticSearch,我跟随你的帖子。https://www.confluent.io/blog/ksql-in-action-enriching-csv-events-with-data-from-rdbms-into-aws/ 所以现在我不想使用值模式和键模式。
从链接https://docs.confluent.io/current/connect/kafka-connect-spooldir/connectors/csv_source_connector.html; 我认为schema.generation.enabled可以设置为true。
下面是我的restapi调用[包括连接器配置]
$curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://xxx:000/connectors/ -d '{
"name":"csv1",
"config":{
"tasks.max":"1",
"connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"input.file.pattern":"^.*csv$",
"halt.on.error":"false",
"topic":"order",
"schema.generation.enabled":"true",
"schema.generation.key.name":"orderschema",
"schema.generation.value.name":"orderdata",
"csv.first.row.as.header":"true",
"csv.null.field.indicator":"EMPTY_SEPARATORS",
"batch.size" : "5000",
}
}
'
当我提交这个时,我得到以下错误name“:”order“,”connector“:{”state“:”failed“,”worker\u id“:”localhost:000“,”trace“:”org.apache.kafka.connect.errors.dataexception:为输入模式找到多个架构。\n架构:{“name\”:\“com.github.jcustenborder.kafka.connect.model.value\”,\“type\”:\“struct\”,\“isoptional\”:false,\“fieldschemas\”:
有什么解决办法?
1条答案
按热度按时间lf5gs5x21#
我现在可以分析所有的数据了。诀窍是先处理一个文件[任何文件],然后选中add随机添加另一个文件。看起来是这样的,它自动更新模式(就像robin moffatt所说的那样)在那之后,将所有文件添加到文件夹中,过程就很好了。耶!