我想通过http协议将数据流流的计算结果发送到其他服务。我看到了两种可能的实现方法:
在sink中使用同步apache httpclient
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
private static final String URL = "http://httpbin.org/post";
private CloseableHttpClient httpClient;
private Histogram httpStatusesAccumulator;
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpClients.custom()
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
}
@Override
public void close() throws Exception {
httpClient.close();
httpStatusesAccumulator.resetLocal();
}
@Override
public void invoke(SessionItem value) throws Exception {
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
HttpPost httpPost = new HttpPost(URL);
httpPost.setEntity(entity);
try(CloseableHttpResponse response = httpClient.execute(httpPost)) {
int httpStatusCode = response.getStatusLine().getStatusCode();
httpStatusesAccumulator.add(httpStatusCode);
}
}
}
在接收器中使用异步apache httpasyncclient
public class AsyncHttpSink extends RichSinkFunction<SessionItem> {
private static final String URL = "http://httpbin.org/post";
private CloseableHttpAsyncClient httpClient;
private Histogram httpStatusesAccumulator;
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpAsyncClients.custom()
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
httpClient.start();
httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
}
@Override
public void close() throws Exception {
httpClient.close();
httpStatusesAccumulator.resetLocal();
}
@Override
public void invoke(SessionItem value) throws Exception {
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
HttpPost httpPost = new HttpPost(URL);
httpPost.setEntity(entity);
httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
int httpStatusCode = response.getStatusLine().getStatusCode();
httpStatusesAccumulator.add(httpStatusCode);
}
@Override
public void failed(Exception ex) {
httpStatusesAccumulator.add(-1); // -1 - failed
}
@Override
public void cancelled() {
httpStatusesAccumulator.add(-2); // -2 - cancelled
}
});
}
}
问题:
我应该在接收器中使用同步或异步http客户端吗?
万一我将使用同步客户端,它将阻止Flume和通过背压Flink将阻止源。正确的?
如果我使用异步客户机,它不会阻塞接收器。正确的?
蓄能器不是螺纹安全的?i、 我可以在异步回调中使用它吗?
runtimecontext不是线程安全的?i、 我可以在异步回调中使用它吗?
1条答案
按热度按时间z9ju0rcb1#
1. 我应该在接收器中使用同步或异步http客户端吗?
为了避免由于阻塞http调用而产生的反压力,我建议使用异步http客户机。
2. 万一我将使用同步客户端,它将阻止Flume和通过背压Flink将阻止源。正确的?
是的,没错。背压将通过拓扑传播到源。
3. 如果我使用异步客户机,它不会阻塞接收器。正确的?
这是正确的。
4. 蓄能器不是螺纹安全的?i、 我可以在异步回调中使用它吗?
蓄能器不是线程安全的,因此必须同步访问它们。
5. runtimecontext不是线程安全的?i、 我可以在异步回调中使用它吗?
这个
RuntimeContext
不是线程安全的,因此必须同步对它的访问。