kafka、mongodb和React式编程的增量值

rbl8hiat  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(222)

对不起,标题不好,但我想不出任何描述性的标题。
我有一种设计问题,所以我需要增加文档中某个字段相对于上一个记录的值,我的意思是假设我有下面的模型:

@Document
data class Example(
    @Id
    var id: String?=null,
    val count: Long,
    @Indexed(unique = true)
    val type: Enum(YES,NO)
)

这是我的存储库

interface ExampleRepository : ReactiveSortingRepository<Example, String> {
    fun findOneByType(type: Enum): Mono<Example>
}

这是我的服务

@Service
class ExampleService(private val repository: ExampleRepository) {

    private val logger = LoggerFactory.getLogger(javaClass)

    fun createOrIncrease(type: Enum) = repository
        .findOneByType(type)
        .map { item ->
            logger.info("TEST - Found item $item")
            Example(item.id, item.count+1, item.type)
        }
        .defaultIfEmpty(Example(null, 1, NO))
        .flatMap { item ->
            logger.info("TEST - Saving item $item")
            repository
                .save(item)
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(5)))
        }

}

基本上,我有一个kafka监听器,它从主题获取请求并调用createorincrease方法,然后检查是否存在,如果不存在,则增加计数,它将创建一个默认值。
现在我的问题发生在我用kafka多次发送同一个文档,并从不同的线程读取它时,所以在同一时间,所有人都试图找到他们无法找到的文档,因为这是第一次,然后他们尝试使用mongodb抛出重复错误的默认文档。我正在努力寻找更好的方法来实现相同的功能,但在分布式系统中,这是在微服务体系结构中,多个微服务可以同时从不同的kafka分区获得相同的文档。
你知道我该怎么解决这个问题吗?
我可以想出两种可能的解决办法:
有一个临时收集和插入那里,不管有任何重复,然后有一个调度程序比每10分钟运行一次,聚合数据和清理数据库,但我正在寻找一个非调度方法。
生产者发送类型作为Kafka主题键,所以Kafka将把它放在同一个分区和一个分区是一个消费者只,这意味着它将是连续的,但这意味着,生产者需要知道下游的问题,我正试图避免有更多的解耦设计
使用$inc mongodb作为我的count字段,但是我不知道这种方法的缺点

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题