reactor.netty.http.client.HttpClient类的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(10.7k)|赞(0)|评价(0)|浏览(1558)

本文整理了Java中reactor.netty.http.client.HttpClient类的一些代码示例,展示了HttpClient类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。HttpClient类的具体详情如下:
包路径:reactor.netty.http.client.HttpClient
类名称:HttpClient

HttpClient介绍

[英]An HttpClient allows to build in a safe immutable way an http client that is materialized and connecting when TcpClient#connect() is ultimately called.

Internally, materialization happens in three phases, first #tcpConfiguration()is called to retrieve a ready to use TcpClient, then TcpClient#configure() retrieve a usable Bootstrap for the final TcpClient#connect() is called.

Examples:

HttpClient.create()HttpClient.create()HttpClient.create()

[中]HttpClient允许以安全的不可变方式构建http客户端,该客户端在最终调用TcpClient#connect()时被具体化并连接。
在内部,具体化分三个阶段进行,首先调用#tcpConfiguration()来检索准备使用的TcpClient,然后调用TcpClient#configure()来检索最终TcpClient的可用引导程序#connect()。
示例:

HttpClient.create()HttpClient.create()HttpClient.create()

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

private static WebClient.Builder createDefaultWebClient(Duration connectTimeout, Duration readTimeout) {
    HttpClient httpClient = HttpClient.create()
                     .compress(true)
                     .tcpConfiguration(tcp -> tcp.bootstrap(bootstrap -> bootstrap.option(
                       ChannelOption.CONNECT_TIMEOUT_MILLIS,
                       (int) connectTimeout.toMillis()
                     )).observe((connection, newState) -> {
                       if (ConnectionObserver.State.CONNECTED.equals(newState)) {
                         connection.addHandlerLast(new ReadTimeoutHandler(readTimeout.toMillis(),
                           TimeUnit.MILLISECONDS
                         ));
                       }
                     }));
    ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
    return WebClient.builder().clientConnector(connector);
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
  return getHttpClient()
      .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
      .websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
      .uri(url.toString())
      .handle((inbound, outbound) -> {
        HttpHeaders responseHeaders = toHttpHeaders(inbound);
        String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
        HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
        NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
        WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
        if (logger.isDebugEnabled()) {
          logger.debug("Started session '" + session.getId() + "' for " + url);
        }
        return handler.handle(session);
      })
      .doOnRequest(n -> {
        if (logger.isDebugEnabled()) {
          logger.debug("Connecting to " + url);
        }
      })
      .next();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
    Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
  if (!uri.isAbsolute()) {
    return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
  }
  return this.httpClient
      .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
      .uri(uri.toString())
      .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
      .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
      .next();
}

代码示例来源:origin: spring-projects/spring-framework

private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory) {
  ConnectionProvider provider = resourceFactory.getConnectionProvider();
  LoopResources resources = resourceFactory.getLoopResources();
  Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
  Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
  return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources));
}

代码示例来源:origin: reactor/reactor-netty

private Mono<String> login(int port) {
    return HttpClient.create()
             .port(port)
             .wiretap(true)
             .post()
             .uri("/login")
             .responseSingle((res, buf) -> Mono.just(res.status().code() + ""));
  }
}

代码示例来源:origin: reactor/reactor-netty

private List<String> getClientDataPromise() {
  HttpClient httpClient =
      HttpClient.create()
           .port(httpServer.address().getPort())
           .wiretap(true);
  Mono<List<String>> content = httpClient.get()
                      .uri("/data")
                      .responseContent()
                      .asString()
                      .collectList()
                      .cache();
  List<String> res = content.block(Duration.ofSeconds(30));
  return res;
}

代码示例来源:origin: reactor/reactor-netty

@Test
public void simpleTest() {
  httpServer = HttpServer.create()
              .port(0)
              .handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("test"))))
              .wiretap(true)
              .bindNow();
  String res = Objects.requireNonNull(
      HttpClient.create()
           .port(httpServer.address().getPort())
           .wiretap(true)
           .headers(h -> h.add("Authorization", auth))
           .websocket()
           .uri("/test")
           .handle((i, o) -> i.receive().asString())
           .log("client")
           .collectList()
           .block()).get(0);
  Assert.assertThat(res, is("test"));
}

代码示例来源:origin: reactor/reactor-netty

@Test
public void testIssue460() {
  DisposableServer server =
      HttpServer.create()
           .port(0)
           .host("::1")
           .wiretap(true)
           .handle((req, res) -> res.sendWebsocket((in, out) -> Mono.never()))
           .bindNow();
  HttpClient httpClient =
      HttpClient.create()
           .addressSupplier(server::address)
           .wiretap(true)
           .headers(h -> h.add(HttpHeaderNames.HOST, "[::1"));
  StepVerifier.create(httpClient.websocket()
                 .connect())
                 .expectError()
                 .verify(Duration.ofSeconds(30));
  server.disposeNow();
}

代码示例来源:origin: reactor/reactor-netty

@Test
public void proxy_1() {
  StepVerifier.create(
      HttpClient.create()
           .tcpConfiguration(tcpClient -> tcpClient.proxy(ops -> ops.type(ProxyProvider.Proxy.HTTP)
                                        .host("localhost")
                                        .port(hoverflyRule.getProxyPort())))
           .addressSupplier(server::address)
           .wiretap(true)
           .get()
           .uri("/")
           .responseSingle((response, body) -> Mono.zip(body.asString(),
               Mono.just(response.responseHeaders()))))
        .expectNextMatches(t ->
            t.getT2().contains("Hoverfly") &&
              "test".equals(t.getT1()))
        .expectComplete()
        .verify(Duration.ofSeconds(30));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

HttpClient httpClient = HttpClient.create(connectionProvider)
  .tcpConfiguration(tcpClient -> {
if (ssl.getTrustedX509CertificatesForTrustManager().length > 0
    || ssl.isUseInsecureTrustManager()) {
  httpClient = httpClient.secure(sslContextSpec -> {

代码示例来源:origin: reactor/reactor-netty

@Test
public void releaseInboundChannelOnNonKeepAliveRequest() {
  DisposableServer c = HttpServer.create()
                  .port(0)
                  .handle((req, resp) -> req.receive().then(resp.status(200).send()))
                  .wiretap(true)
                  .bindNow();
  Flux<ByteBuf> src = Flux.range(0, 3)
              .map(n -> Unpooled.wrappedBuffer(Integer.toString(n)
                                  .getBytes(Charset.defaultCharset())));
  Flux.range(0, 100)
    .concatMap(n -> HttpClient.create()
                 .port(c.address().getPort())
                 .tcpConfiguration(TcpClient::noSSL)
                 .wiretap(true)
                 .keepAlive(false)
                 .post()
                 .uri("/return")
                 .send(src)
                 .responseSingle((res, buf) -> Mono.just(res.status().code())))
    .collectList()
    .block();
  c.disposeNow();
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Default constructor.
 */
public ReactorNettyWebSocketClient() {
  this(HttpClient.create());
}

代码示例来源:origin: spring-projects/spring-framework

private ReactorClientHttpConnector initConnector() {
  if (bufferFactory instanceof NettyDataBufferFactory) {
    ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
    return new ReactorClientHttpConnector(this.factory, httpClient ->
        httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
  }
  else {
    return new ReactorClientHttpConnector();
  }
}

代码示例来源:origin: reactor/reactor-netty

@Test
public void testMaxFramePayloadLengthFailed() {
  httpServer = HttpServer.create()
      .port(0)
      .handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("12345678901"))))
      .wiretap(true)
      .bindNow();
  Mono<Void> response = HttpClient.create()
          .port(httpServer.address().getPort())
          .websocket(10)
          .handle((in, out) -> in.receive()
              .asString()
              .map(srv -> srv))
          .log()
          .then();
  StepVerifier.create(response)
      .expectError(CorruptedFrameException.class)
      .verify(Duration.ofSeconds(30));
}

代码示例来源:origin: com.rabbitmq/http-client

private Mono<HttpResponse> doPost(Object body, String... pathSegments) {
  return client.headersWhen(authorizedHeader())
    .headers(JSON_HEADER)
    .chunkedTransfer(false)
    .post()
    .uri(uri(pathSegments))
    .send(bodyPublisher(body))
    .response()
    .doOnNext(applyResponseCallback())
    .map(ReactorNettyClient::toHttpResponse);
}

代码示例来源:origin: reactor/reactor-netty

private Mono<Void> proxy(HttpServerRequest request, HttpServerResponse response) {
  return HttpClient.create()
           .wiretap(true)
           .headers(h -> h.add(filterRequestHeaders(request.requestHeaders())))
           .get()
           .uri(URI.create("http://localhost:" + CONTENT_SERVER_PORT +
                   "/" + request.path())
               .toString())
           .response((targetResponse, buf) -> response.headers(filterResponseHeaders(targetResponse.responseHeaders()))
                            .send(buf.retain())
                            .then())
           .then();
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Before
public void setup() {
  try {
    SslContext sslContext = SslContextBuilder.forClient()
        .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
    HttpClient httpClient = HttpClient.create().secure(ssl -> {
      ssl.sslContext(sslContext);
    });
    ClientHttpConnector httpConnector = new ReactorClientHttpConnector(
        httpClient);
    baseUri = "https://localhost:" + port;
    this.webClient = WebClient.builder().clientConnector(httpConnector)
        .baseUrl(baseUri).build();
    this.testClient = WebTestClient.bindToServer(httpConnector).baseUrl(baseUri).build();
  }
  catch (SSLException e) {
    throw new RuntimeException(e);
  }        
}

代码示例来源:origin: reactor/reactor-netty

@Test
public void nettyNetChannelAcceptsNettyChannelHandlers() throws InterruptedException {
  HttpClient client = HttpClient.create()
                 .wiretap(true);
  final CountDownLatch latch = new CountDownLatch(1);
  System.out.println(client.get()
               .uri("http://www.google.com/?q=test%20d%20dq")
               .responseContent()
               .asString()
               .collectList()
               .doOnSuccess(v -> latch.countDown())
               .block(Duration.ofSeconds(30)));
  assertTrue("Latch didn't time out", latch.await(15, TimeUnit.SECONDS));
}

代码示例来源:origin: reactor/reactor-netty

/**
 * Specifies whether GZip compression/websocket compression
 * extension is enabled.
 *
 * @param compressionEnabled if true GZip compression/websocket compression extension
 *                              is enabled otherwise disabled (default: false)
 * @return a new {@link HttpClient}
 */
public final HttpClient compress(boolean compressionEnabled) {
  if (compressionEnabled) {
    return tcpConfiguration(COMPRESS_ATTR_CONFIG).headers(COMPRESS_HEADERS);
  }
  else {
    return tcpConfiguration(COMPRESS_ATTR_DISABLE).headers(COMPRESS_HEADERS_DISABLE);
  }
}

代码示例来源:origin: reactor/reactor-netty

@Test
public void simpleTest404_1() {
  ConnectionProvider pool = ConnectionProvider.fixed("http", 1);
  HttpClient client =
      HttpClient.create(pool)
           .port(80)
           .tcpConfiguration(tcpClient -> tcpClient.host("google.com"))
           .wiretap(true);
  doSimpleTest404(client);
  doSimpleTest404(client);
  pool.dispose();
}

相关文章