我是一个初学者,刚开始用spring Boot 开发pulsar-client。首先,我通过pulsar doc和git学习了一些基础知识,但是我一直在测试pulsar客户端生产者的消息批量传输。特别是,我想批量发送JsonArray数据,但是我一直收到JsonArray.getAsInt错误。请看一下我的代码,告诉我出了什么问题
package com.refactorizando.example.pulsar.producer;
import static java.util.stream.Collectors.toList;
import com.refactorizando.example.pulsar.config.PulsarConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.shade.com.google.gson.JsonArray;
import org.apache.pulsar.shade.com.google.gson.JsonElement;
import org.apache.pulsar.shade.com.google.gson.JsonObject;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class PulsarProducer {
private static final String TOPIC_NAME = "Json_Test";
private final PulsarClient client;
@Bean(name = "producer")
public void producer() throws PulsarClientException {
// batching
Producer<JsonArray> producer = client.newProducer(JSONSchema.of(JsonArray.class))
.topic(TOPIC_NAME)
.batchingMaxPublishDelay(60, TimeUnit.SECONDS)
.batchingMaxMessages(2)
.enableBatching(true)
.compressionType(CompressionType.LZ4)
.create();
String data = "{'users': [{'userId': 1,'firstName': 'AAAAA'},{'userId': 2,'firstName': 'BBBB'},{'userId': 3,'firstName': 'CCCCC'},{'userId': 4,'firstName': 'DDDDD'},{'userId': 5,'firstName': 'EEEEE'}]}";
JsonElement element = JsonParser.parseString(data);
JsonObject obj = element.getAsJsonObject();
JsonArray arr = obj.getAsJsonArray("users");
try {
producer.send(arr);
} catch (Exception e) {
log.error("Error sending mesasage");
e.printStackTrace();
}
producer.close();
}
}
我还是一个初学者,所以我在stackOverflow上找不到它,因为我搜索不好。如果你有任何相关的问题,请留下链接,我会删除这个问题。感谢阅读我的问题,祝你有美好的一天!
我尝试了几种方法,比如转换成JsonObject并发送、转换成String并发送等,但都出现了同样的错误。
1条答案
按热度按时间yizd12fk1#
cho,欢迎来到脉冲星和Spring脉冲星!我相信有几件事要涵盖,以充分回答你的问题。
Spring脉冲星的使用
在您的示例中,您直接从PulsarClient创建了一个Producer。直接使用该API绝对没有任何错误/缺点。但是,如果您想使用SpringPulsar,在使用Spring Pulsar的Spring Boot应用程序中发送消息的推荐方法是通过自动配置的
PulsarTemplate
(如果使用Reactive,则为ReactivePulsarTemplate
)。它简化了使用,并允许使用配置属性配置模板/生成器。例如,与构建并使用Producer.send()
不同,您将注入脉冲星模板并按如下方式使用它:此外,您可以使用配置属性替换构建器调用,例如:
然后你的代码变成
模式
在Pulsar中,Schema知道如何反序列化数据。内置的Pulsar
Schema.JSON
默认使用Jackson json lib来反序列化数据。这要求数据必须能够被JacksonObjectMapper.readValue/writeValue
方法处理。它非常好地处理POJO,但不能处理您正在使用的JSON impl。我注意到最新的
json-lib
是2.4
,(AFAICT)有9 CVEs反对它,最后一次发布是在2010年。如果我必须为我的数据使用Json级API,我会选择一个更现代和支持/使用良好的库,如Jackson或Gson。我把你的示例换成了Jackson ArrayNode,效果很好。我确实不得不把数据字符串中的单引号替换成反斜杠双引号,因为默认情况下Jackson不喜欢单引号数据。下面是使用Jackson ArrayNode重新编写的示例应用程序:
输出如下所示:
数据模型
您的数据是一个用户数组。您是否需要使用Json级别的API,或者您需要处理
List<User>
POJO?这将简化操作并使其具有更好的使用体验。Java记录是一个很好的选择,例如:然后你可以传入一个
List<User>
到你的PulsarTemplate,一切都会很好地工作。希望能帮上忙。保重。