jackson 如何在webflux中异步解析/写入json?ObjectMapper方法被阻塞

0x6upsns  于 2022-11-09  发布在  其他
关注(0)|答案(3)|浏览(191)

我看到Jackson从2.9开始就支持非阻塞,但是如何在webflux中使用它呢?有演示吗?

dojqjjoe

dojqjjoe1#

下面介绍如何使用WebClient向Github的List Repositories API发出GET请求

public Flux<GithubRepo> listGithubRepositories(String username, String token) {
     return webClient.get()
            .uri("/user/repos")
            .header("Authorization", "Basic " + Base64Utils
                    .encodeToString((username + ":" + token).getBytes(UTF_8)))
            .retrieve()
            .bodyToFlux(GithubRepo.class);
}

假设我们有一个名为GithubRepo的类,它确认了Github的API响应,上面的函数将返回一个GithubRepo对象的Flux。

5ssjco0h

5ssjco0h2#

如果您正在寻找阻塞ObjectMapper,请尝试在线程池(如bounded elastic)中使用subscribeOn方法,并用Mono.fromCallable Package 它

hivapdat

hivapdat3#

我找到了显示实现的代码--请看下面我是如何修改它的。
https://github.com/mmimica/async-jackson/blob/master/src/main/java/com/mmimica/ajackson/AsyncJsonParser.java
请求处理程序:

public Mono<ServerResponse> myHttpHandler(ServerRequest request) {
    return this.handlerJsonRequestBodyParser.parse(request)
    .flatMap(map
            -> this.myService.doSomething(x,y)
    )
    .flatMap(response -> ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(id, String.class)
            .log()
    );

}

类实现:

package lsp.order.handler;

import java.io.IOException;
import java.util.Map;
import lsp.order.exception.BadJsonRequestBodyException;
import lsp.order.util.AsyncJsonParser;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import reactor.core.publisher.Mono;

@Component
public class HandlerJsonRequestBodyParser {

public Mono<Map<String, Object>> parse(ServerRequest request) {

    return request.bodyToMono(ByteArrayResource.class)
            .map(ByteArrayResource::getByteArray)
            .flatMap(byteArray -> {
                try {
                    AsyncJsonParser parser = new AsyncJsonParser();
                    return parser.parse(byteArray);
                } catch (IOException ex) {
                    return Mono.error(new BadJsonRequestBodyException());
                }
            });

    }

}

我修改了它以原样使用

package lsp.order.util;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import reactor.core.publisher.Mono;

/**
 * @credit https://github.com/mmimica/async-jackson
 */
public class AsyncJsonParser {

private final NonBlockingJsonParser parser;

private final Stack stack = new Stack();

private String fieldName;

public AsyncJsonParser() throws IOException {

    JsonFactory factory = new JsonFactory();
    parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser();

}

public Mono<Map<String, Object>> parse(byte[] bytes) throws IOException {

    ObjectMapper mapper = new ObjectMapper();
    mapper.registerModule(new JavaTimeModule());

    ByteArrayFeeder feeder = parser.getNonBlockingInputFeeder();
    boolean consumed = false;
    while (!consumed) {
        if (feeder.needMoreInput()) {
            feeder.feedInput(bytes, 0, bytes.length);
            consumed = true;
        }

        JsonToken event;
        while ((event = parser.nextToken()) != JsonToken.NOT_AVAILABLE) {
            JsonNode root = buildTree(event);
            if (root != null) {
                Map<String, Object> jsonMap = mapper.treeToValue(root, Map.class);
                return Mono.just(jsonMap);
            }
        }
    }

    return Mono.error(new RuntimeException("Cannot parse JSON"));

}

private static final class Stack {

    private final LinkedList<JsonNode> list = new LinkedList<>();

    JsonNode pop() {
        return list.removeLast();
    }

    JsonNode top() {
        if (list.isEmpty()) {
            return null;
        }
        return list.getLast();
    }

    void push(JsonNode n) {
        list.add(n);
    }

    boolean isEmpty() {
        return list.isEmpty();
    }
}

/**
 * @return The root node when the whole tree is built.
 *
 */
private JsonNode buildTree(JsonToken event) throws IOException {
    switch (event) {
        case FIELD_NAME:
            assert !stack.isEmpty();
            fieldName = parser.getCurrentName();
            return null;

        case START_OBJECT:
            stack.push(createNode(stack.top()));
            return null;

        case START_ARRAY:
            stack.push(createArray(stack.top()));
            return null;

        case END_OBJECT:
        case END_ARRAY:
            assert !stack.isEmpty();
            JsonNode current = stack.pop();
            if (stack.isEmpty()) {
                return current;
            } else {
                return null;
            }

        case VALUE_NUMBER_INT:
            assert !stack.isEmpty();
            addLong(stack.top(), parser.getLongValue());
            return null;

        case VALUE_STRING:
            assert !stack.isEmpty();
            addString(stack.top(), parser.getValueAsString());
            return null;

        case VALUE_NUMBER_FLOAT:
            assert !stack.isEmpty();
            addFloat(stack.top(), parser.getFloatValue());
            return null;

        case VALUE_NULL:
            assert !stack.isEmpty();
            addNull(stack.top());
            return null;

        case VALUE_TRUE:
            assert !stack.isEmpty();
            addBoolean(stack.top(), true);
            return null;

        case VALUE_FALSE:
            assert !stack.isEmpty();
            addBoolean(stack.top(), false);
            return null;

        default:
            throw new RuntimeException("Unknown json event " + event);
    }
}

private JsonNode createNode(JsonNode current) {
    if (ObjectNode.class.isInstance(current)) {
        return ObjectNode.class.cast(current).putObject(fieldName);
    } else if (ArrayNode.class.isInstance(current)) {
        return ArrayNode.class.cast(current).addObject();
    } else {
        return JsonNodeFactory.instance.objectNode();
    }
}

private JsonNode createArray(JsonNode current) {
    if (ObjectNode.class.isInstance(current)) {
        return ObjectNode.class.cast(current).putArray(fieldName);
    } else if (ArrayNode.class.isInstance(current)) {
        return ArrayNode.class.cast(current).addArray();
    } else {
        return JsonNodeFactory.instance.arrayNode();
    }
}

private void addLong(JsonNode current, long v) {
    assert current != null;

    if (ObjectNode.class.isInstance(current)) {
        ObjectNode.class.cast(current).put(fieldName, v);
    } else {
        ArrayNode.class.cast(current).add(v);
    }
}

private void addString(JsonNode current, String s) {
    assert current != null;

    if (ObjectNode.class.isInstance(current)) {
        ObjectNode.class.cast(current).put(fieldName, s);
    } else {
        ArrayNode.class.cast(current).add(s);
    }
}

private void addFloat(JsonNode current, float f) {
    assert current != null;

    if (ObjectNode.class.isInstance(current)) {
        ObjectNode.class.cast(current).put(fieldName, f);
    } else {
        ArrayNode.class.cast(current).add(f);
    }
}

private void addNull(JsonNode current) {
    assert current != null;

    if (ObjectNode.class.isInstance(current)) {
        ObjectNode.class.cast(current).putNull(fieldName);
    } else {
        ArrayNode.class.cast(current).addNull();
    }
}

private void addBoolean(JsonNode current, boolean b) {
    assert current != null;

    if (ObjectNode.class.isInstance(current)) {
        ObjectNode.class.cast(current).put(fieldName, b);
    } else {
        ArrayNode.class.cast(current).add(b);
    }
}

}

相关问题