Spring Boot 如何在apache-pulsar客户端发送JsonArray数据?

9jyewag0  于 2023-02-04  发布在  Spring
关注(0)|答案(1)|浏览(301)

我是一个初学者,刚开始用spring Boot 开发pulsar-client。首先,我通过pulsar doc和git学习了一些基础知识,但是我一直在测试pulsar客户端生产者的消息批量传输。特别是,我想批量发送JsonArray数据,但是我一直收到JsonArray.getAsInt错误。请看一下我的代码,告诉我出了什么问题

package com.refactorizando.example.pulsar.producer;

import static java.util.stream.Collectors.toList;

import com.refactorizando.example.pulsar.config.PulsarConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.shade.com.google.gson.JsonArray;
import org.apache.pulsar.shade.com.google.gson.JsonElement;
import org.apache.pulsar.shade.com.google.gson.JsonObject;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class PulsarProducer {

  private static final String TOPIC_NAME = "Json_Test";

  private final PulsarClient client;

  @Bean(name = "producer")
  public void producer() throws PulsarClientException {
   
    // batching
    Producer<JsonArray> producer = client.newProducer(JSONSchema.of(JsonArray.class))
        .topic(TOPIC_NAME)
        .batchingMaxPublishDelay(60, TimeUnit.SECONDS)
        .batchingMaxMessages(2)
        .enableBatching(true)
        .compressionType(CompressionType.LZ4)
        .create();
    String data = "{'users': [{'userId': 1,'firstName': 'AAAAA'},{'userId': 2,'firstName': 'BBBB'},{'userId': 3,'firstName': 'CCCCC'},{'userId': 4,'firstName': 'DDDDD'},{'userId': 5,'firstName': 'EEEEE'}]}";
    JsonElement element = JsonParser.parseString(data);
    JsonObject obj = element.getAsJsonObject();
    JsonArray arr = obj.getAsJsonArray("users");
    try {
      producer.send(arr);
    } catch (Exception e) {
      log.error("Error sending mesasage");
      e.printStackTrace();
    }

    producer.close();

  }

}

我还是一个初学者,所以我在stackOverflow上找不到它,因为我搜索不好。如果你有任何相关的问题,请留下链接,我会删除这个问题。感谢阅读我的问题,祝你有美好的一天!
我尝试了几种方法,比如转换成JsonObject并发送、转换成String并发送等,但都出现了同样的错误。

yizd12fk

yizd12fk1#

cho,欢迎来到脉冲星和Spring脉冲星!我相信有几件事要涵盖,以充分回答你的问题。

Spring脉冲星的使用

在您的示例中,您直接从PulsarClient创建了一个Producer。直接使用该API绝对没有任何错误/缺点。但是,如果您想使用SpringPulsar,在使用Spring Pulsar的Spring Boot应用程序中发送消息的推荐方法是通过自动配置的PulsarTemplate(如果使用Reactive,则为ReactivePulsarTemplate)。它简化了使用,并允许使用配置属性配置模板/生成器。例如,与构建并使用Producer.send()不同,您将注入脉冲星模板并按如下方式使用它:

pulsarTemplate.newMessage(foo)
    .withTopic("Json_Test")
    .withSchema(Schema.JSON(Foo.class))
    .withProducerCustomizer((producerBuilder) -> {
      producerBuilder
          .batchingMaxPublishDelay(60, TimeUnit.SECONDS)
          .batchingMaxMessages(2)
          .enableBatching(true)
          .compressionType(CompressionType.LZ4);
    })
    .send();

此外,您可以使用配置属性替换构建器调用,例如:

spring:
  pulsar:
    producer:
      batching-enabled: true
      batching-max-publish-delay: 60s
      batching-max-messages: 2
      compression-type: lz4

然后你的代码变成

pulsarTemplate.newMessage(foo)
    .withTopic("Json_Test")
    .withSchema(Schema.JSON(Foo.class))
    .send();
    • 注意:**为简单起见,我用Foo替换了json数组。

模式

在Pulsar中,Schema知道如何反序列化数据。内置的Pulsar Schema.JSON默认使用Jackson json lib来反序列化数据。这要求数据必须能够被Jackson ObjectMapper.readValue/writeValue方法处理。它非常好地处理POJO,但不能处理您正在使用的JSON impl。
我注意到最新的json-lib2.4,(AFAICT)有9 CVEs反对它,最后一次发布是在2010年。如果我必须为我的数据使用Json级API,我会选择一个更现代和支持/使用良好的库,如Jackson或Gson。
我把你的示例换成了Jackson ArrayNode,效果很好。我确实不得不把数据字符串中的单引号替换成反斜杠双引号,因为默认情况下Jackson不喜欢单引号数据。下面是使用Jackson ArrayNode重新编写的示例应用程序:

@SpringBootApplication
public class HyunginChoSpringPulsarUserApp {

    public static void main(String[] args) {
        SpringApplication.run(HyunginChoSpringPulsarUserApp.class, args);
    }

    @Bean
    ApplicationRunner sendDataOnStartup(PulsarTemplate<ArrayNode> pulsarTemplate) {
        return (args) -> {
            String data2 = "{\"users\": [{\"userId\": 1,\"firstName\": \"AAAAA\"},{\"userId\": 2,\"firstName\": \"BBBB\"},{\"userId\": 3,\"firstName\": \"CCCCC\"},{\"userId\": 4,\"firstName\": \"DDDDD\"},{\"userId\": 5,\"firstName\": \"EEEEE\"}]}";
            ArrayNode jsonArray = (ArrayNode) ObjectMapperFactory.create().readTree(data2).get("users");
            System.out.printf("*** SENDING: %s%n", jsonArray);
            pulsarTemplate.newMessage(jsonArray)
                    .withTopic("Json_Test")
                    .withSchema(Schema.JSON(ArrayNode.class))
                    .send();
        };
    }

    @PulsarListener(topics = "Json_Test", schemaType = SchemaType.JSON, batch = true)
    public void listenForData(List<ArrayNode> user) {
        System.out.printf("***** LISTEN: %s%n".formatted(user));
    }
}

输出如下所示:

*** SENDING: [{"userId":1,"firstName":"AAAAA"},{"userId":2,"firstName":"BBBB"},{"userId":3,"firstName":"CCCCC"},{"userId":4,"firstName":"DDDDD"},{"userId":5,"firstName":"EEEEE"}]

***** LISTEN: [{"userId":1,"firstName":"AAAAA"},{"userId":2,"firstName":"BBBB"},{"userId":3,"firstName":"CCCCC"},{"userId":4,"firstName":"DDDDD"},{"userId":5,"firstName":"EEEEE"}]

数据模型

您的数据是一个用户数组。您是否需要使用Json级别的API,或者您需要处理List<User> POJO?这将简化操作并使其具有更好的使用体验。Java记录是一个很好的选择,例如:

public record(String userId, String firstName) {}

然后你可以传入一个List<User>到你的PulsarTemplate,一切都会很好地工作。

@SpringBootApplication
public class HyunginChoSpringPulsarUserApp {

    public static void main(String[] args) {
        SpringApplication.run(HyunginChoSpringPulsarUserApp.class, args);
    }

    @Bean
    ApplicationRunner sendDataOnStartup(PulsarTemplate<User> pulsarTemplate) {
        return (args) -> {
            String data2 = "{\"users\": [{\"userId\": 1,\"firstName\": \"AAAAA\"},{\"userId\": 2,\"firstName\": \"BBBB\"},{\"userId\": 3,\"firstName\": \"CCCCC\"},{\"userId\": 4,\"firstName\": \"DDDDD\"},{\"userId\": 5,\"firstName\": \"EEEEE\"}]}";
            ObjectMapper objectMapper = ObjectMapperFactory.create();
            JsonNode usersNode = objectMapper.readTree(data2).get("users");
            List<User> users = objectMapper.convertValue(usersNode, new TypeReference<>() {});
            System.out.printf("*** SENDING: %s%n", users);
            for (User user : users) {
                pulsarTemplate.newMessage(user)
                        .withTopic("Json_Test2")
                        .withSchema(Schema.JSON(User.class))
                        .send();
            }
        };
    }

    @PulsarListener(topics = "Json_Test2", schemaType = SchemaType.JSON, batch = true)
    public void listenForData(List<User> users) {
        users.forEach((user) -> System.out.printf("***** LISTEN: %s%n".formatted(user)));
    }

    public record User(String userId, String firstName) {}
}
*** SENDING: [User[userId=1, firstName=AAAAA], User[userId=2, firstName=BBBB], User[userId=3, firstName=CCCCC], User[userId=4, firstName=DDDDD], User[userId=5, firstName=EEEEE]]
...
***** LISTEN: User[userId=1, firstName=AAAAA]
***** LISTEN: User[userId=2, firstName=BBBB]
***** LISTEN: User[userId=3, firstName=CCCCC]
***** LISTEN: User[userId=4, firstName=DDDDD]
***** LISTEN: User[userId=5, firstName=EEEEE]

希望能帮上忙。保重。

相关问题