java 如何将MongoDB ChangeStreamDocument作为我的dto而不是文档类型输出?

jpfvwuh4  于 2023-01-07  发布在  Java
关注(0)|答案(1)|浏览(139)

在我的MongoDB项目中有以下类:

@Document(collection = "teams")
public class Team {
    private @MongoId(FieldType.OBJECT_ID)
    @Schema(type = "string", example = "60b0c56e4192f01e8745bd75")
    ObjectId id;
    @Schema(example = "56373")
    private Integer orgId;
    private String name;
    private List<Member> players;
    private List<Member> staff;

    public class Member{
        private ObjectId id;
        private String name;
    }
}

正如你所看到的,这个类代表了我在MongoDB中的teams集合中的文档。我正在尝试创建一个***变更流***,因为我想监控加入和离开团队的球员和职员球员,以及从团队集合中删除的现有团队。
这就是我所尝试的:

@Component
public class MongoChangeStream {

    private final MongoTemplate mongoTemplate;

    public MongoDBChangeStream(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @EventListener(ContextRefreshedEvent.class)
    public void changeStream() {

        // Select the collection to query
        MongoCollection<Document> collection = mongoTemplate.getCollection("teams");

        // Create pipeline for operationType filter
        List<Bson> pipeline = Arrays.asList(
                Aggregates.match(
                        Filters.in("operationType",
                                Arrays.asList("insert", "update", "delete"))));

        // Create the Change Stream and watch on the filters in the pipeline
        ChangeStreamIterable<Document> changeStream = collection.watch()
            .fullDocument(FullDocument.UPDATE_LOOKUP)
            .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);

        // Iterate over the Change Stream
        for (ChangeStreamDocument<Document> changeEvent : changeStream) {
            switch (changeEvent.getOperationType().name()) {
                case "UPDATE":
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
                        // Do something
                    }
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
                        // Do something
                    }
                    break;
                case "DELETE":
                        // Do something
                    break;
            }
        }
    }
}

我的问题基本上是如何将文档作为Team对象而不是Document对象从changeStream中取出?
我尝试更改所有出现的Documents with Team,但出现以下错误:
Can't find a codec for CodecCacheKey{clazz=class com.test.dto.Team, types=null}.

toiithl6

toiithl61#

最后我把它做成了这样:

@Document(collection = "teams")
public class Team {
    @Id
    @BsonProperty("_id")
    private ObjectId id;
    private Integer orgId;
    private String name;
    private List<Member> players;
    private List<Member> staff;

    public class Member {
        @Id
        private ObjectId id;
        private String name;
    }
}

然后我将CodecRegistry添加到MongoChangeStream类中,如下所示:
这就是我所尝试的:

@Component
public class MongoChangeStream {

    private final MongoTemplate mongoTemplate;

    public MongoDBChangeStream(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @EventListener(ContextRefreshedEvent.class)
    public void changeStream() {

        // Select the collection to query
        CodecRegistry pojoCodecRegistry = org.bson.codecs.configuration.CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), org.bson.codecs.configuration.CodecRegistries.fromProviders(PojoCodecProvider.builder().conventions(List.of(ANNOTATION_CONVENTION)).automatic(true).build()));
        MongoCollection<Team> collection = mongoTemplate.getCollection("teams").withCodecRegistry(pojoCodecRegistry).withDocumentClass(Group.class);

        // Create pipeline for operationType filter
        List<Bson> pipeline = Arrays.asList(
                Aggregates.match(
                        Filters.in("operationType",
                                Arrays.asList("insert", "update", "delete"))));

        // Create the Change Stream and watch on the filters in the pipeline
        ChangeStreamIterable<Team> changeStream = collection.watch()
            .fullDocument(FullDocument.UPDATE_LOOKUP)
            .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);

        // Iterate over the Change Stream
        for (ChangeStreamDocument<Team> changeEvent : changeStream) {
            switch (changeEvent.getOperationType().name()) {
                case "UPDATE":
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
                        // Do something
                    }
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
                        // Do something
                    }
                    break;
                case "DELETE":
                        // Do something
                    break;
            }
        }
    }
}

相关问题