我试图在mongodb.com.mongodb.reactivestreams.client中插入一个文档
根据官方指南“。。。只有在订阅了发布服务器并请求了数据后,操作才会发生。。。一旦文档被插入,onnext方法将被调用“并提供了一个我在下面尝试但没有成功的示例。通过调试,我看到onsubscribe被调用,但onnext bellow从未被调用,并且我签入了mongodb,文档也没有被插入。
这是我试过的
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.util.Arrays;
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class App {
public static void main(String[] args) throws Throwable {
MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("mydb");
MongoCollection<Document> collection = database.getCollection("mycollection");
Document doc = new Document("name", "MongoDB")
.append("type", "database")
.append("count", 1)
.append("versions", Arrays.asList("v3.2", "v3.0", "v2.6"))
.append("info", new Document("x", 203).append("y", 102));
Publisher<InsertOneResult> publisher = collection.insertOne(doc);
publisher.subscribe(new Subscriber<InsertOneResult>() {
@Override
public void onSubscribe(final Subscription s) {
s.request(1); // <--- Data requested and the insertion will now occur
}
@Override
public void onNext(final InsertOneResult result) {
System.out.println("Inserted: " + result);
}
@Override
public void onError(final Throwable t) {
System.out.println("Failed");
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
}
}
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>java-mongoreactive-mvn</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>4.1.1</version>
</dependency>
</dependencies>
</project>
我订阅的时候是不是漏掉了什么?
***根据普拉萨德的建议编辑
这是最后的解决办法。奇怪的是,所有阅读/查询官方指南的例子都驱使我使用6年前提供的例子。
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import static com.mongodb.client.model.Filters.eq;
import java.util.Arrays;
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class App {
public static void main(String[] args) throws Throwable {
//MongoClient mongoClient = MongoClients.create();
MongoClient mongoClient = MongoClients.create("mongodb://root:q@localhost:27017");
MongoDatabase database = mongoClient.getDatabase("mydb");
MongoCollection<Document> collection = database.getCollection("mycollection");
Document doc = new Document("name", "MongoDB")
.append("type", "database")
.append("count", 1)
.append("versions", Arrays.asList("v3.2", "v3.0", "v2.6"))
.append("info", new Document("x", 203).append("y", 102));
Publisher<InsertOneResult> publisher = collection.insertOne(doc);
publisher.subscribe(new Subscriber<InsertOneResult>() {
@Override
public void onSubscribe(final Subscription s) {
s.request(1); // <--- Data requested and the insertion will now occur
}
@Override
public void onNext(final InsertOneResult result) {
System.out.println("Inserted: " + result);
}
@Override
public void onError(final Throwable t) {
System.out.println("Failed");
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
//I am wondering if I am in right direction because I have just copied SubscriberHelpers to my project in order to countDocuments and find
collection.countDocuments()
.subscribe(new SubscriberHelpers.PrintSubscriber<Long>("total # of documents after inserting "
+ " is: %s"));
collection.find(eq("name", "MongoDB")).first().subscribe(new SubscriberHelpers.PrintDocumentSubscriber());
Thread.sleep(1000);
}
}
为了计算文档数量,我还添加了(只是从6年前的一个示例复制过来的)
import com.mongodb.MongoTimeoutException;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
/**
* Subscriber helper implementations for the Quick Tour.
*/
public final class SubscriberHelpers {
/**
* A Subscriber that stores the publishers results and provides a latch so can block on completion.
*
* @param <T> The publishers result type
*/
public static class ObservableSubscriber<T> implements Subscriber<T> {
private final List<T> received;
private final List<Throwable> errors;
private final CountDownLatch latch;
private volatile Subscription subscription;
private volatile boolean completed;
ObservableSubscriber() {
this.received = new ArrayList<T>();
this.errors = new ArrayList<Throwable>();
this.latch = new CountDownLatch(1);
}
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
}
@Override
public void onNext(final T t) {
received.add(t);
}
@Override
public void onError(final Throwable t) {
errors.add(t);
onComplete();
}
@Override
public void onComplete() {
completed = true;
latch.countDown();
}
public Subscription getSubscription() {
return subscription;
}
public List<T> getReceived() {
return received;
}
public Throwable getError() {
if (errors.size() > 0) {
return errors.get(0);
}
return null;
}
public boolean isCompleted() {
return completed;
}
public List<T> get(final long timeout, final TimeUnit unit) throws Throwable {
return await(timeout, unit).getReceived();
}
public ObservableSubscriber<T> await() throws Throwable {
return await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public ObservableSubscriber<T> await(final long timeout, final TimeUnit unit) throws Throwable {
subscription.request(Integer.MAX_VALUE);
if (!latch.await(timeout, unit)) {
throw new MongoTimeoutException("Publisher onComplete timed out");
}
if (!errors.isEmpty()) {
throw errors.get(0);
}
return this;
}
}
/**
* A Subscriber that immediately requests Integer.MAX_VALUE onSubscribe
*
* @param <T> The publishers result type
*/
public static class OperationSubscriber<T> extends ObservableSubscriber<T> {
@Override
public void onSubscribe(final Subscription s) {
super.onSubscribe(s);
s.request(Integer.MAX_VALUE);
}
}
/**
* A Subscriber that prints a message including the received items on completion
*
* @param <T> The publishers result type
*/
public static class PrintSubscriber<T> extends OperationSubscriber<T> {
private final String message;
/**
* A Subscriber that outputs a message onComplete.
*
* @param message the message to output onComplete
*/
public PrintSubscriber(final String message) {
this.message = message;
}
@Override
public void onComplete() {
System.out.println(format(message, getReceived()));
super.onComplete();
}
}
/**
* A Subscriber that prints the json version of each document
*/
public static class PrintDocumentSubscriber extends OperationSubscriber<Document> {
@Override
public void onNext(final Document document) {
super.onNext(document);
System.out.println(document.toJson());
}
}
private SubscriberHelpers() {
}
}
暂无答案!
目前还没有任何答案,快来回答吧!