你好,我想用Kafka写一个RESTful服务。我正在编写代码,使其在每次发出GET或POST请求时将消费者从kafka接收到的消息存储到打开的搜索索引中,但我不断收到此错误:
结果:{"error ":" Content-Type header [application/x-www-form-urlencoded] is not supported "," status ":406}
这是我的消费者:
public class Main {
static Client client = new Client();
public static void main(String[] args) throws IOException, NoSuchAlgorithmException, KeyManagementException {
String servers = "localhost:9092";
String groupId = "id";
String topic = "mytopic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(topic));
Logger logger = LoggerFactory.getLogger(Main.class.getName());
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key:" + record.key() + ", Value:" + record.value());
logger.info("Partition:" + record.partition() + ", Offset:" + record.offset());
String result = client.run("https://localhost:9200/index3/_create/" + record.offset(), "admin", "admin", record);
System.out.println("Result: " + result);
}
}
}
}
有没有办法避免这种情况??
我的消息是“请求”:“有效”,我试图把它写得像一个键值json,但我错了
编辑:我试图将内容类型设置为"application/json",但它抛出了相同的错误!
这是我的客户:
public String run(String url, String username, String password, ConsumerRecord<String, String> record) throws IOException, NoSuchAlgorithmException, KeyManagementException {
//TrustManager per accettare tutti i certificati
TrustManager[] trustAllCerts = new TrustManager[]{new Client()};
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustAllCerts, new SecureRandom());
//Client
OkHttpClient client = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
.sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustAllCerts[0])
.hostnameVerifier((hostname, session) -> true)
.build();
//Authentication
String credentials = username + ":" + password;
String encode = Base64.getEncoder().encodeToString(credentials.getBytes());
String authHeader = "Basic " + encode;
RequestBody requestBody = new FormBody.Builder()
.add("Message:", "n°" + record.offset())
.build();
Request request = new Request.Builder()
.url(url)
.header("Authorization", authHeader)
.header("Content-Type", "application/json")
.post(requestBody) //Body della post
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
}
1条答案
按热度按时间jhdbpxl91#
我解决了使用GSon依赖项将请求主体变为JsonObject的问题:
这就是代码: