无法使用合流elasticsearch接收器连接器将kafka主题数据转换为结构化json

2mbi3lxu  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(325)

我正在用Kafka建立一个数据管道。数据流如下:捕获mongodb中的数据更改并将其发送到elasticsearch。

数据库
版本3.6
碎片簇
Kafka
Confunt平台4.1.0
mongodb源连接器:debezium 0.7.5
弹性Flume连接器
ElasticSearch
版本6.1.0
因为我还在测试,Kafka相关的系统都在单服务器上运行。
启动zookeepr

$ bin/zookeeper-server-start etc/kafka/zookeeper.properties

启动引导服务器

$ bin/kafka-server-start etc/kafka/server.properties

启动注册表架构

$ bin/schema-registry-start etc/schema-registry/schema-registry.properties

启动mongodb源连接器

$ bin/connect-standalone \ 
  etc/schema-registry/connect-avro-standalone.properties \ 
  etc/kafka/connect-mongo-source.properties

$ cat etc/kafka/connect-mongo-source.properties
>>> 
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee

$ cat etc/schema-registry/connect-avro-standalone.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8083

启动elasticsearchFlume连接器

$ bin/connect-standalone \ 
  etc/schema-registry/connect-avro-standalone2.properties  \ 
  etc/kafka-connect-elasticsearch/elasticsearch.properties

$ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
>>>
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect

$ cat etc/schema-registry/connect-avro-standalone2.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.\ 
                      JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084

以上系统一切正常。kafka连接器捕获数据更改(cdc),并通过接收器连接器将其成功发送到elasticsearch。问题是我无法将字符串类型的消息数据转换为结构化数据类型。例如,让我们在对mongodb进行一些更改之后使用主题数据。

$ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq

然后,得到如下结果。

"after": null,
      "patch": {
        "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
      },
      "source": {
        "version": {
          "string": "0.7.5"
        },
        "name": "higee",
        "rs": "172.31.50.13",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": {
          "long": -2379508538412995600
        },
        "initsync": {
          "boolean": false
        }
      },
      "op": {
        "string": "u"
      },
      "ts_ms": {
        "long": 1524214412159
      }
    }

然后,如果我去elasticsearch,我会得到以下结果。

{
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "after": null,
          "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"}""",
          "source": {
            "version": "0.7.5",
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          },
          "op": "u",
          "ts_ms": 1524214412159
        }
      }

我想达到的目标是

{
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"
         }"
     }

我一直在尝试并仍在考虑的一些选择如下。
贮木场
案例1:不知道如何解析这些字符(/u0002,/u0001)
日志存储.conf

input {
  kafka {
    bootstrap_servers => ["localhost:9092"]
    topics => ["higee.higee.higee"]
    auto_offset_reset => "earliest"
    codec => json {
      charset => "UTF-8"
    }
  }
}

filter {
  json {
    source => "message"
  }
 }

output {
  stdout {
    codec => rubydebug
  }
}

结果

{
"message" => "H\u0002�\u0001{\"_id\" : \
    {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
     \"name\" : \"higee\", \
     \"salary\" : 101, \
     \"origin\" : \"South Korea\"} \
     \u0002\n0.7.5\nhigee \ 
     \u0018172.31.50.13\u001Ahigee.higee2 \ 
     ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
"tags" => [[0] "_jsonparsefailure"]
}

案例2
日志存储.conf

input {
  kafka {
    bootstrap_servers => ["localhost:9092"]
    topics => ["higee.higee.higee"]
    auto_offset_reset => "earliest"
    codec => avro {
      schema_uri => "./test.avsc"
    }
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

测试.avsc

{
    "namespace": "example",
    "type": "record",
    "name": "Higee",
    "fields": [
      {"name": "_id", "type": "string"},
      {"name": "name", "type": "string"},
      {"name": "salary",  "type": "int"},
      {"name": "origin", "type": "string"}
    ]
 }

结果

An unexpected error occurred! {:error=>#<NoMethodError: 
undefined method `type_sym' for nil:NilClass>, :backtrace=> 
["/home/ec2-user/logstash- 
6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
"org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
thread_runner'", "/home/ec2-user/logstash- 
6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
thread_runner'"]}

python客户端
在一些数据操作之后使用不同的主题名来使用主题并生成,这样elasticsearch接收器连接器就可以使用来自python操作主题的格式良好的消息 kafka 库:无法解码消息

from kafka import KafkaConsumer

consumer = KafkaConsumer(
             topics='higee.higee.higee',
             auto_offset_reset='earliest'
           )

for message in consumer:
    message.value.decode('utf-8')

>>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
    invalid continuation byte
``` `confluent_kafka` 与Python3不兼容
你知道我如何在elasticsearch中对数据进行jsonify吗?以下是我搜索的来源。
蒙哥布
mongodb事件展平
avro转换器
序列化debizium事件
debizum教程
提前谢谢。
一些尝试
1) 为了测试转换,我将connect-mongo-source.properties文件更改如下。

$ cat etc/kafka/connect-mongo-source.properties
>>>
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope

下面是我得到的错误日志。我还不能适应Kafka和更重要的debezium平台,我无法调试这个错误。

ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

2) 这次,我更改了elasticsearch.properties,没有更改connect-mongo-source.properties。

$ cat connect-mongo-source.properties

name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee

$ cat elasticsearch.properties

name=elasticsearch-sink
connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
我犯了以下错误。

ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

3) 更改了test.avsc并运行了logstash。我没有收到任何错误信息,但结果不是我所期望的 `origin` ,  `salary` ,  `name` 字段都是空的,即使它们被赋予了非空值。我甚至可以通过控制台正确读取数据。

$ cat test.avsc

{
  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ {
    "name" : "_id",
    "type" : {
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ {
        "name" : "$oid",
        "type" : "string"
      }, {
        "name" : "salary",
        "type" : "long"
      }, {
        "name" : "origin",
        "type" : "string"
      }, {
        "name" : "name",
        "type" : "string"
      } ]
    }
  } ]
}

$ cat logstash3.conf

input {
  kafka {
    bootstrap_servers => ["localhost:9092"]
    topics => ["higee.higee.higee"]
    auto_offset_reset => "earliest"
    codec => avro {
      schema_uri => "./test.avsc"
    }
  }
}

output {
  stdout {
   codec => rubydebug
  }
}

$ bin/logstash -f logstash3.conf

{
"@version" => "1",
"_id" => {
  "salary" => 0,
  "origin" => "",
  "$oid" => "",
  "name" => ""
},
"@timestamp" => 2018-04-25T09:39:07.962Z
}
ioekq8ef

ioekq8ef1#

我使用python-kafka客户端解决了这个问题。下面是我的管道的新架构。

我使用了Python2,尽管confluent文档说支持python3。主要原因是有一些python2语法代码。例如…(不是完全按照第行,而是类似的语法)

except NameError, err:

为了使用python3,我需要将上面的行转换为:

except NameError as err:

也就是说,下面是我的python代码。请注意,此代码仅用于原型设计,尚未用于生产。

通过合流消费者消费消息

代码

from confluent_kafka.avro import AvroConsumer

c = AvroConsumer({ 
       'bootstrap.servers': '',
       'group.id': 'groupid',
       'schema.registry.url': ''
    })

c.subscribe(['higee.higee.higee'])

x = True

while x:
    msg = c.poll(100)
    if msg:
        message = msg.value()
        print(message)
        x = False

c.close()

(在mongodb中更新文档之后)让我们检查一下 message 变量

{u'after': None,
 u'op': u'u',
 u'patch': u'{
     "_id" : {"$oid" : "5adafc0e2a0f383bb63910a6"},
     "name" : "higee",
     "salary" : 100,
     "origin" : "S Korea"}',
 u'source': {
     u'h': 5734791721791032689L,
     u'initsync': False,
     u'name': u'higee',
     u'ns': u'higee.higee',
     u'ord': 1,
     u'rs': u'',
     u'sec': 1524362971,
     u'version': u'0.7.5'},
 u'ts_ms': 1524362971148
 }

操纵已使用的消息

代码

patch = message['patch']
patch_dict = eval(patch)
patch_dict.pop('_id')

检查 patch_dict ```
{'name': 'higee', 'origin': 'S Korea', 'salary': 100}


#### 通过合流生产者产生消息

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema_str = """
{
   "namespace": "higee.higee",
   "name": "MongoEvent",
   "type": "record",
   "fields" : [
       {
           "name" : "name",
           "type" : "string"
       },
       {
          "name" : "origin",
          "type" : "string"
       },
       {
           "name" : "salary",
           "type" : "int32"
       }
   ]
}
"""
AvroProducerConf = {
    'bootstrap.servers': '',
    'schema.registry.url': ''
}

value_schema = avro.load('./user.avsc')
avroProducer = AvroProducer(
                   AvroProducerConf, 
                   default_value_schema=value_schema
               )

avroProducer.produce(topic='python', value=patch_dict)
avroProducer.flush()
剩下的唯一一件事就是通过以下格式设置配置,使elasticsearch sink connector响应新主题“python”。一切都一样除了 `topics` .

name=elasticsearch-sink
connector.class= io.confluent.connect \
elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=python
key.ignore=true
connection.url=''
type.name=kafka-connect

然后运行elasticsearchFlume连接器,并在elasticsearch进行检查。

{
"_index": "zzzz",
"_type": "kafka-connect",
"_id": "zzzz+0+3",
"_score": 1,
"_source": {
"name": "higee",
"origin": "S Korea",
"salary": 100
}
}

a2mppw5e

a2mppw5e2#

+1到@cricket\u 007的建议-使用 io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope 单个消息转换。你可以阅读更多关于SMT和他们的好处在这里。

klsxnrf1

klsxnrf13#

python客户端

您必须使用avro消费者,否则您将 'utf-8' codec can't decode byte 即使是这个例子也行不通,因为您仍然需要schema注册表来查找schema。
confluent的python客户机的前提条件是它可以与Python3.x一起工作
没有什么能阻止您使用不同的客户机,所以不知道为什么您只使用python。

logstash avro编解码器

json编解码器无法解码avro数据。我认为avro输入编解码器后面的json过滤器也不会起作用
你的avro模式是错误的-你错过了 $oid 代替 _id “raw avro”(包含消息本身中的模式)和confluent的编码版本(只包含注册表中的模式id)之间有区别。意思是,logstash不与模式注册表集成。。。至少没有插件也不行。
你的avsc应该是这样的

{
  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ {
    "name" : "_id",
    "type" : {
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ {
        "name" : "$oid",
        "type" : "string"
      }, {
        "name" : "salary",
        "type" : "long"
      }, {
        "name" : "origin",
        "type" : "string"
      }, {
        "name" : "name",
        "type" : "string"
      } ]
    }
  } ]
}

然而,avro不允许名字以regex开头 [A-Za-z_] ,所以 $oid 会是个问题。
虽然我不推荐(实际上也没有尝试过),但是从avro控制台消费者那里获取json编码的avro数据到logstash的一种可能方法是使用pipeinput插件

input {
  pipe {
    codec => json
    command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
  }
}

去肠

请注意 after 值总是一个字符串,按照惯例,它将包含文档的json表示
http://debezium.io/docs/connectors/mongodb/
我认为这也适用于 patch 价值观,但我真的不懂debezium。
如果不使用简单消息转换(smt),kafka将无法在飞行中解析json。在阅读链接到的文档时,您可能应该将这些添加到连接源属性中

transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

同样值得指出的是,场平坦化是在路线图-dbz-561

Kafka连接ElasticSearch

elasticsearch在不使用logstash或其json处理器的情况下不会解析和处理编码的json字符串对象。相反,它只将它们作为整个字符串体进行索引。
如果我没记错的话,connect将只对顶级avro字段应用elasticsearchMap,而不是嵌套字段。
换句话说,生成的Map遵循这个模式,

"patch": {
    "string": "...some JSON object string here..."
  },

您实际上需要这样做的地方—可能需要手动定义es索引

"patch": {
   "properties": {
      "_id": {
        "properties" {
          "$oid" :  { "type": "text" }, 
          "name" :  { "type": "text" },
          "salary":  { "type": "int"  }, 
          "origin": { "type": "text" }
      },

不过,我也不确定美元符号是否被允许。

Kafka连接mongodb源码

如果以上都不起作用,可以尝试使用其他连接器
方案1
方案2

相关问题