对不起,标题不好,但我想不出任何描述性的标题。
我有一种设计问题,所以我需要增加文档中某个字段相对于上一个记录的值,我的意思是假设我有下面的模型:
@Document
data class Example(
@Id
var id: String?=null,
val count: Long,
@Indexed(unique = true)
val type: Enum(YES,NO)
)
这是我的存储库
interface ExampleRepository : ReactiveSortingRepository<Example, String> {
fun findOneByType(type: Enum): Mono<Example>
}
这是我的服务
@Service
class ExampleService(private val repository: ExampleRepository) {
private val logger = LoggerFactory.getLogger(javaClass)
fun createOrIncrease(type: Enum) = repository
.findOneByType(type)
.map { item ->
logger.info("TEST - Found item $item")
Example(item.id, item.count+1, item.type)
}
.defaultIfEmpty(Example(null, 1, NO))
.flatMap { item ->
logger.info("TEST - Saving item $item")
repository
.save(item)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(5)))
}
}
基本上,我有一个kafka监听器,它从主题获取请求并调用createorincrease方法,然后检查是否存在,如果不存在,则增加计数,它将创建一个默认值。
现在我的问题发生在我用kafka多次发送同一个文档,并从不同的线程读取它时,所以在同一时间,所有人都试图找到他们无法找到的文档,因为这是第一次,然后他们尝试使用mongodb抛出重复错误的默认文档。我正在努力寻找更好的方法来实现相同的功能,但在分布式系统中,这是在微服务体系结构中,多个微服务可以同时从不同的kafka分区获得相同的文档。
你知道我该怎么解决这个问题吗?
我可以想出两种可能的解决办法:
有一个临时收集和插入那里,不管有任何重复,然后有一个调度程序比每10分钟运行一次,聚合数据和清理数据库,但我正在寻找一个非调度方法。
生产者发送类型作为Kafka主题键,所以Kafka将把它放在同一个分区和一个分区是一个消费者只,这意味着它将是连续的,但这意味着,生产者需要知道下游的问题,我正试图避免有更多的解耦设计
使用$inc mongodb作为我的count字段,但是我不知道这种方法的缺点
暂无答案!
目前还没有任何答案,快来回答吧!