使用kafka activemq源连接器将jms消息读取为json而不是文本

nbnkbykc  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(333)

过去几个月我一直在使用kafka connect,最近我加入了activemq源插件,以便读取一些包含json文件的jms主题消息,将它们放在kafka主题中,然后在ksqldb中创建一个流/表,将json文件中的一些键用作列。不过,插件将jms消息插入为带双引号的文本,因此在ksqldb中无法正确识别。我尝试了配置中的各种方法来修复它,但到目前为止没有任何效果。我还想使用json格式,而不是kafka connect中的avro(也没有运行模式注册表)。出于测试目的,我还尝试通过将头内容指定为“application/json”来发送jms消息,但仍然没有成功。
下面是我的activemq插件的样子

"config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}

下面是我的Kafka连接配置

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

plugin.path=/opt/kafka_2.13-2.5.0/plugins

这里还有一个Kafka如何使用这些信息的例子

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": "{\"widget\": {     \"debug\": \"on\",    \"window\": {        \"title\": \"Sample Konfabulator Widget\",        \"name\": \"main_window\",        \"width\": 500,        \"height\": 500    },    \"image\": {        \"src\": \"Images/Sun.png\",        \"name\": \"sun1\",        \"hOffset\": 250,        \"vOffset\": 250,        \"alignment\": \"center\"    },    \"text\": {        \"data\": \"Click Here\",        \"size\": 36,        \"style\": \"bold\",        \"name\": \"text1\",        \"hOffset\": 250,        \"vOffset\": 100,        \"alignment\": \"center\",        \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}

如果任何其他信息是需要的,请让我知道任何帮助将不胜感激。
更新:最终,最好的解决方案是能够以某种方式配置连接器,使其不会转义负载中的引号。同样不幸的是,转义引号是从activemq本身生成的,并且不是初始消息的一部分
所以信息应该是这样的

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    }

}
qojgxg4l

qojgxg4l1#

欢迎elen1no1yami!
在我看来问题是 text 消息的字段是一个字符串,其中包含您感兴趣的json负载,但该负载的双引号用 \ 烧焦。
我假设activemq中的数据本身没有 \ 查尔,但如果你能澄清这一点就好了。
解决这个问题的方法是:
能够将连接器配置为不转义有效负载中的引号。所以信息看起来更像:

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    },
    ... etc
}

或者以某种方式让ksqldb处理消息,因为它仍然可以访问 text 现场。
这就是你要找的东西吗?如果是,请更新您的问题以反映这一点(在你的问题中包含这样的细节是很好的,这样你的问题就清楚了。
至于答案。。。
我不是连接Maven,所以不能真正评论,也看不到连接器配置的任何细节,这些细节可能允许您更改 text . 其他更了解连接的人也许能提供更多帮助。
为了能够访问ksqldb中嵌入/转义的json,首先需要删除转义。有关使用ksqldb执行此操作的方法,请参见下文

使用ksqldb访问转义的json

在我们可以访问中的json文档之前 text 我们必须把逃逸的人移走。
我能想到两种方法:

编写自定义自定义自定义项

最好的方法是编写一个自定义的udf'unescape\ujson',它可以删除转义。

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );

-- Use custom UDF to process this and write it back as a properly formatted JSON document:
CREATE STREAM JSONIFIED AS
  SELECT MY_CUSTOM_UDF(message) FROM RAW;

如果编写正确,自定义udf方法将不会出现潜在的数据损坏问题 REPLACE 基于web的解决方案有很多缺点。

使用replace删除转义

注意:这个解决方案是脆弱的:字符替换可以匹配和替换不应该匹配的内容,这取决于您的消息的内容!
让我们用更简单的测试数据来解释需要什么,例如我们想要转换:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": 10}"
}

收件人:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": 10}
}

这需要三件事:
更换开口 "text": "{"text": { 全部替换 \"" .
更换关闭 }"} 我们可以使用replace函数或regexp\u replace函数:

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );

-- Use REPLACE to remove reformat:
CREATE STREAM JSONIFIED AS 
  SELECT 
    REPLACE(
      REPLACE(
        REPLACE(message, 
          '"text": "{', '"text": {'), 
          '\"', '"'), 
          '"}', '}')
  FROM RAW;

当然,如果您的数据包含以下任何搜索项,则此解决方案可能会损坏您的数据: "text": "{ , \" 或者 "} 数据中的任何其他位置,例如。

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": \"hello \\\"} world\"}"
}

将错误地转换为

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": "hello \\}world"}
}

这就是为什么自定义自定义自定义项会更好。
更正输入内容(并将其写入新主题)后,可以正常导入数据:

CREATE STREAM DATA (
   messageId STRING,
   text STRUCT<Widget INT>
 ) WITH (
   kafka_topic='JSONIFIED',
   value_format='JSON'
 );

相关问题