storm-bolt异步http调用

y0u0uwnf  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(274)

我需要在我的bolt中进行异步http调用。在这个bolt中,只需要进行异步http调用,在得到响应后,它应该插入到ElasticSearch中。我跟随这个链接这里是伪代码。

public void prepare(Map arg0, TopologyContext context,
        OutputCollector collector) {
    httpclient = HttpAsyncClients.createDefault();
    httpclient.start();
    _collector = collector;
}

public void execute(Tuple tuple) {
    try{
        if( !isTickTuple(tuple) ) {
            data = (data)tuple.getValueByField("source");
            httpCall(data,"http://");
        }
    }catch(Exception e){
        logger.warn(e.getMessage());
        if(logger.isDebugEnabled())logger.debug(e.getMessage(),e);
    }finally{
        _collector.ack(tuple);
    }
}

public void httpCall(String data,String url) {
HttpPost postRequest = new HttpPost(url);
HttpEntity httpEntity = null;

//CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
postRequest.addHeader("content-type", "application/json");
postRequest.setEntity(httpEntity);

HttpAsyncRequestProducer producer = HttpAsyncMethods.create(postRequest);
AsyncCharConsumer<HttpResponse> consumer = new AsyncCharConsumer<HttpResponse>() {

    HttpResponse response;

    @Override
    protected void onResponseReceived(final HttpResponse response) {
    this.response = response;
    }

    @Override
    protected void onCharReceived( CharBuffer buf,  IOControl ioctrl) throws IOException {
        // Do something useful
    }

    @Override
    protected void releaseResources() {
    }

    @Override
    protected HttpResponse buildResult( HttpContext context) {
        return this.response;
    }
};

httpclient.execute(producer, consumer, new FutureCallback<HttpResponse>() {
    @Override
    public void completed(HttpResponse response) {
        int responseCode = response.getStatusLine().getStatusCode();
        if(logger.isDebugEnabled())logger.debug("Response code::"+responseCode );
        if (responseCode == HttpServletResponse.SC_OK) {
            HttpEntity entity = response.getEntity();
            try {
                String data = EntityUtils.toString(entity, "UTF-8");
                if(logger.isDebugEnabled())logger.debug("Response string");
                updatedata(data);   
            } catch (ParseException e) {    
                e.printStackTrace();
            } catch (IOException e) {   
                e.printStackTrace();
            }
        }
        if(logger.isDebugEnabled())logger.debug("completed Method:::: "+response.toString());
    }

    @Override
    public void failed(Exception ex) {
        if(logger.isDebugEnabled())logger.debug("!!! Async http request failed!", ex);
    }

    @Override
    public void cancelled() {
        if(logger.isDebugEnabled())logger.debug("Async http request canceled!");
    }
});
}

在这里,响应代码(200)正在日志中打印,但日志程序在这一行之后

String data = EntityUtils.toString(entity, "UTF-8");

没有打印。任何机构都可以提供更好的方法链接或代码。
更新:我在oncharreceived方法中添加了logger,它现在逐块打印响应。有谁能告诉我如何得到完整的回答,或者应该使用其他库。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题