我看到Jackson从2.9开始就支持非阻塞,但是如何在webflux中使用它呢?有演示吗?
dojqjjoe1#
下面介绍如何使用WebClient向Github的List Repositories API发出GET请求
public Flux<GithubRepo> listGithubRepositories(String username, String token) { return webClient.get() .uri("/user/repos") .header("Authorization", "Basic " + Base64Utils .encodeToString((username + ":" + token).getBytes(UTF_8))) .retrieve() .bodyToFlux(GithubRepo.class); }
假设我们有一个名为GithubRepo的类,它确认了Github的API响应,上面的函数将返回一个GithubRepo对象的Flux。
5ssjco0h2#
如果您正在寻找阻塞ObjectMapper,请尝试在线程池(如bounded elastic)中使用subscribeOn方法,并用Mono.fromCallable Package 它
Mono.fromCallable
hivapdat3#
我找到了显示实现的代码--请看下面我是如何修改它的。https://github.com/mmimica/async-jackson/blob/master/src/main/java/com/mmimica/ajackson/AsyncJsonParser.java请求处理程序:
public Mono<ServerResponse> myHttpHandler(ServerRequest request) { return this.handlerJsonRequestBodyParser.parse(request) .flatMap(map -> this.myService.doSomething(x,y) ) .flatMap(response -> ServerResponse .ok() .contentType(MediaType.APPLICATION_JSON) .body(id, String.class) .log() ); }
类实现:
package lsp.order.handler; import java.io.IOException; import java.util.Map; import lsp.order.exception.BadJsonRequestBodyException; import lsp.order.util.AsyncJsonParser; import org.springframework.core.io.ByteArrayResource; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.ServerRequest; import reactor.core.publisher.Mono; @Component public class HandlerJsonRequestBodyParser { public Mono<Map<String, Object>> parse(ServerRequest request) { return request.bodyToMono(ByteArrayResource.class) .map(ByteArrayResource::getByteArray) .flatMap(byteArray -> { try { AsyncJsonParser parser = new AsyncJsonParser(); return parser.parse(byteArray); } catch (IOException ex) { return Mono.error(new BadJsonRequestBodyException()); } }); } }
我修改了它以原样使用
package lsp.order.util; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.async.ByteArrayFeeder; import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import java.io.IOException; import java.util.LinkedList; import java.util.Map; import reactor.core.publisher.Mono; /** * @credit https://github.com/mmimica/async-jackson */ public class AsyncJsonParser { private final NonBlockingJsonParser parser; private final Stack stack = new Stack(); private String fieldName; public AsyncJsonParser() throws IOException { JsonFactory factory = new JsonFactory(); parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser(); } public Mono<Map<String, Object>> parse(byte[] bytes) throws IOException { ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); ByteArrayFeeder feeder = parser.getNonBlockingInputFeeder(); boolean consumed = false; while (!consumed) { if (feeder.needMoreInput()) { feeder.feedInput(bytes, 0, bytes.length); consumed = true; } JsonToken event; while ((event = parser.nextToken()) != JsonToken.NOT_AVAILABLE) { JsonNode root = buildTree(event); if (root != null) { Map<String, Object> jsonMap = mapper.treeToValue(root, Map.class); return Mono.just(jsonMap); } } } return Mono.error(new RuntimeException("Cannot parse JSON")); } private static final class Stack { private final LinkedList<JsonNode> list = new LinkedList<>(); JsonNode pop() { return list.removeLast(); } JsonNode top() { if (list.isEmpty()) { return null; } return list.getLast(); } void push(JsonNode n) { list.add(n); } boolean isEmpty() { return list.isEmpty(); } } /** * @return The root node when the whole tree is built. * */ private JsonNode buildTree(JsonToken event) throws IOException { switch (event) { case FIELD_NAME: assert !stack.isEmpty(); fieldName = parser.getCurrentName(); return null; case START_OBJECT: stack.push(createNode(stack.top())); return null; case START_ARRAY: stack.push(createArray(stack.top())); return null; case END_OBJECT: case END_ARRAY: assert !stack.isEmpty(); JsonNode current = stack.pop(); if (stack.isEmpty()) { return current; } else { return null; } case VALUE_NUMBER_INT: assert !stack.isEmpty(); addLong(stack.top(), parser.getLongValue()); return null; case VALUE_STRING: assert !stack.isEmpty(); addString(stack.top(), parser.getValueAsString()); return null; case VALUE_NUMBER_FLOAT: assert !stack.isEmpty(); addFloat(stack.top(), parser.getFloatValue()); return null; case VALUE_NULL: assert !stack.isEmpty(); addNull(stack.top()); return null; case VALUE_TRUE: assert !stack.isEmpty(); addBoolean(stack.top(), true); return null; case VALUE_FALSE: assert !stack.isEmpty(); addBoolean(stack.top(), false); return null; default: throw new RuntimeException("Unknown json event " + event); } } private JsonNode createNode(JsonNode current) { if (ObjectNode.class.isInstance(current)) { return ObjectNode.class.cast(current).putObject(fieldName); } else if (ArrayNode.class.isInstance(current)) { return ArrayNode.class.cast(current).addObject(); } else { return JsonNodeFactory.instance.objectNode(); } } private JsonNode createArray(JsonNode current) { if (ObjectNode.class.isInstance(current)) { return ObjectNode.class.cast(current).putArray(fieldName); } else if (ArrayNode.class.isInstance(current)) { return ArrayNode.class.cast(current).addArray(); } else { return JsonNodeFactory.instance.arrayNode(); } } private void addLong(JsonNode current, long v) { assert current != null; if (ObjectNode.class.isInstance(current)) { ObjectNode.class.cast(current).put(fieldName, v); } else { ArrayNode.class.cast(current).add(v); } } private void addString(JsonNode current, String s) { assert current != null; if (ObjectNode.class.isInstance(current)) { ObjectNode.class.cast(current).put(fieldName, s); } else { ArrayNode.class.cast(current).add(s); } } private void addFloat(JsonNode current, float f) { assert current != null; if (ObjectNode.class.isInstance(current)) { ObjectNode.class.cast(current).put(fieldName, f); } else { ArrayNode.class.cast(current).add(f); } } private void addNull(JsonNode current) { assert current != null; if (ObjectNode.class.isInstance(current)) { ObjectNode.class.cast(current).putNull(fieldName); } else { ArrayNode.class.cast(current).addNull(); } } private void addBoolean(JsonNode current, boolean b) { assert current != null; if (ObjectNode.class.isInstance(current)) { ObjectNode.class.cast(current).put(fieldName, b); } else { ArrayNode.class.cast(current).add(b); } } }
3条答案
按热度按时间dojqjjoe1#
下面介绍如何使用WebClient向Github的List Repositories API发出GET请求
假设我们有一个名为GithubRepo的类,它确认了Github的API响应,上面的函数将返回一个GithubRepo对象的Flux。
5ssjco0h2#
如果您正在寻找阻塞ObjectMapper,请尝试在线程池(如bounded elastic)中使用subscribeOn方法,并用
Mono.fromCallable
Package 它hivapdat3#
我找到了显示实现的代码--请看下面我是如何修改它的。
https://github.com/mmimica/async-jackson/blob/master/src/main/java/com/mmimica/ajackson/AsyncJsonParser.java
请求处理程序:
类实现:
我修改了它以原样使用