java 每次我用Kafka发送消息都得到状态码406

6tdlim6h  于 2023-06-20  发布在  Java
关注(0)|答案(1)|浏览(112)

你好,我想用Kafka写一个RESTful服务。我正在编写代码,使其在每次发出GET或POST请求时将消费者从kafka接收到的消息存储到打开的搜索索引中,但我不断收到此错误:
结果:{"error ":" Content-Type header [application/x-www-form-urlencoded] is not supported "," status ":406}
这是我的消费者:

public class Main {
static Client client = new Client();

public static void main(String[] args) throws IOException, NoSuchAlgorithmException, KeyManagementException {

    String servers = "localhost:9092";
    String groupId = "id";
    String topic = "mytopic";

    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Arrays.asList(topic));

    Logger logger = LoggerFactory.getLogger(Main.class.getName());

    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("Key:" + record.key() + ", Value:" + record.value());
            logger.info("Partition:" + record.partition() + ", Offset:" + record.offset());

            String result = client.run("https://localhost:9200/index3/_create/" + record.offset(), "admin", "admin", record);
            System.out.println("Result: " + result);
        }
    }
}

}
有没有办法避免这种情况??
我的消息是“请求”:“有效”,我试图把它写得像一个键值json,但我错了
编辑:我试图将内容类型设置为"application/json",但它抛出了相同的错误!
这是我的客户:

public String run(String url, String username, String password, ConsumerRecord<String, String> record) throws IOException, NoSuchAlgorithmException, KeyManagementException {

    //TrustManager per accettare tutti i certificati
    TrustManager[] trustAllCerts = new TrustManager[]{new Client()};

    SSLContext sslContext = SSLContext.getInstance("TLS");
    sslContext.init(null, trustAllCerts, new SecureRandom());

    //Client
    OkHttpClient client = new OkHttpClient.Builder()
            .connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
            .sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustAllCerts[0])
            .hostnameVerifier((hostname, session) -> true)
            .build();

    //Authentication
    String credentials = username + ":" + password;
    String encode = Base64.getEncoder().encodeToString(credentials.getBytes());
    String authHeader = "Basic " + encode;

    RequestBody requestBody = new FormBody.Builder()
            .add("Message:", "n°" + record.offset())
            .build();

    Request request = new Request.Builder()
            .url(url)
            .header("Authorization", authHeader)
            .header("Content-Type", "application/json")
            .post(requestBody) //Body della post
            .build();

    try (Response response = client.newCall(request).execute()) {
        return response.body().string();
    }
}

}

jhdbpxl9

jhdbpxl91#

我解决了使用GSon依赖项将请求主体变为JsonObject的问题:

<dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.10.1</version>
    </dependency>

这就是代码:

JsonObject json = new JsonObject();
    json.addProperty("Message n°", record.offset());
    RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), json.toString());

    Request request = new Request.Builder()
            .url(url)
            .header("Authorization", authHeader)
            .post(requestBody) 
            .build();

相关问题