如何将curl命令转换为restemplate

kxe2p93d  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(642)

我想转换一个curl命令,将消息发布到kafka rest代理中。curl命令无效

curl -u username:password --request POST --url https://kafka-rest-proxy-*****.com/topics/test-topic --header 'accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json' --header 'content-type: application/vnd.kafka.avro.v2+json' --data '{"value_schema_id": 5, "records": [{"value": {"event_envelope":{"data":{"test":"using curl","testEventId":23}}}}]}'

我想在spring中通过restemplate发送此请求。如何将curl命令转换为restemplate。我已经写了一些代码,但我面临的问题。

@SpringBootApplication
    public class DemoApplication implements CommandLineRunner{

   String kafkarwsrproxyURL = String.format("%s/topics/%s", "https://kafka-rest-proxy-**********", 
   "test-topic");
   String schemaurl = String.format("%s/subjects/%s/versions/latest", "https://schema-registry- 
 *********", "test-topic");

   @Autowired
   private RestTemplate restTemplate;

   public static void main(String[] args) {
   SpringApplication.run(DemoApplication.class, args);
   }

  @Override
  public void run(String... args) throws Exception {

 ObjectMapper obj = new ObjectMapper();
 JSONObject event = new JSONObject();
 JSONObject record = new JSONObject();
 JSONObject eventenvolpe = new JSONObject();
 JSONObject jsondata  = new JSONObject();
 JSONArray jsonarray =new JSONArray();
 JSONObject recordvalue =new JSONObject();
  // connecting to schema registary and getting back schema   
  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(MediaType.valueOf("application/vnd.schemaregistry.v1+json"));
  headers.setBasicAuth("username", "password");
  HttpEntity<String> SchemaEntity = new HttpEntity<String>("parameters", headers);

     ResponseEntity<String> result = restTemplate.exchange(schemaurl, HttpMethod.GET, SchemaEntity, 
        String.class);
     if(result.getStatusCodeValue()==200) {

         JsonNode rootNode = obj.readTree(result.getBody());
         JsonNode schema_id = rootNode.path("id");//fetchinf schema id form schema

         event = new JSONObject();
         record = new JSONObject();
         eventenvolpe = new JSONObject();
         jsondata  = new JSONObject();
         jsondata.put("data", obj.writeValueAsString(new Data("test1",1)));
         eventenvolpe.put("event_envelope", jsondata);
         recordvalue.put("value", eventenvolpe);
         jsonarray.put(recordvalue);
         event.put("value_schema_id", schema_id);
         event.put("records", recordvalue);//setting up event object to send to kafka
         System.out.println(event);

向Kafka发送消息

HttpHeaders messageheaders = new HttpHeaders();
    messageheaders.setContentType(MediaType.valueOf("application/vnd.kafka.avro.v2+json"));
    messageheaders.setBasicAuth("username", "password");
    HttpEntity<JSONObject> message = new HttpEntity<JSONObject>(event,messageheaders );

    ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST, 
    message, String.class);

    if(result1.getStatusCodeValue()==200) {
        System.out.println("Message is pushed to Kafka");
    } }}}

我收到的错误消息

'at com.example.demo.DemoApplication.main(DemoApplication.java:44) 
 [classes/:na]
 Caused by: org.springframework.web.client.RestClientException: No 
  HttpMessageConverter for 
  org.json.JSONObject and content type "application/vnd.kafka.avro.v2+json"
  at org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:961) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:674) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:583) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at com.example.demo.DemoApplication.run(DemoApplication.java:91) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE] ... 5 common frames omitted
nwnhqdif

nwnhqdif1#

Jackson ObjectMapper 无法识别 JSONObject ,所以需要将其转换为 String :

HttpEntity<String> message = new HttpEntity<>(event.toString(), messageheaders);

ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST,
            message, String.class);

相关问题