我需要在我的bolt中进行异步http调用。在这个bolt中,只需要进行异步http调用,在得到响应后,它应该插入到ElasticSearch中。我跟随这个链接这里是伪代码。
public void prepare(Map arg0, TopologyContext context,
OutputCollector collector) {
httpclient = HttpAsyncClients.createDefault();
httpclient.start();
_collector = collector;
}
public void execute(Tuple tuple) {
try{
if( !isTickTuple(tuple) ) {
data = (data)tuple.getValueByField("source");
httpCall(data,"http://");
}
}catch(Exception e){
logger.warn(e.getMessage());
if(logger.isDebugEnabled())logger.debug(e.getMessage(),e);
}finally{
_collector.ack(tuple);
}
}
public void httpCall(String data,String url) {
HttpPost postRequest = new HttpPost(url);
HttpEntity httpEntity = null;
//CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
postRequest.addHeader("content-type", "application/json");
postRequest.setEntity(httpEntity);
HttpAsyncRequestProducer producer = HttpAsyncMethods.create(postRequest);
AsyncCharConsumer<HttpResponse> consumer = new AsyncCharConsumer<HttpResponse>() {
HttpResponse response;
@Override
protected void onResponseReceived(final HttpResponse response) {
this.response = response;
}
@Override
protected void onCharReceived( CharBuffer buf, IOControl ioctrl) throws IOException {
// Do something useful
}
@Override
protected void releaseResources() {
}
@Override
protected HttpResponse buildResult( HttpContext context) {
return this.response;
}
};
httpclient.execute(producer, consumer, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
int responseCode = response.getStatusLine().getStatusCode();
if(logger.isDebugEnabled())logger.debug("Response code::"+responseCode );
if (responseCode == HttpServletResponse.SC_OK) {
HttpEntity entity = response.getEntity();
try {
String data = EntityUtils.toString(entity, "UTF-8");
if(logger.isDebugEnabled())logger.debug("Response string");
updatedata(data);
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
if(logger.isDebugEnabled())logger.debug("completed Method:::: "+response.toString());
}
@Override
public void failed(Exception ex) {
if(logger.isDebugEnabled())logger.debug("!!! Async http request failed!", ex);
}
@Override
public void cancelled() {
if(logger.isDebugEnabled())logger.debug("Async http request canceled!");
}
});
}
在这里,响应代码(200)正在日志中打印,但日志程序在这一行之后
String data = EntityUtils.toString(entity, "UTF-8");
没有打印。任何机构都可以提供更好的方法链接或代码。
更新:我在oncharreceived方法中添加了logger,它现在逐块打印响应。有谁能告诉我如何得到完整的回答,或者应该使用其他库。
暂无答案!
目前还没有任何答案,快来回答吧!