有没有办法实现Kafka生产者原子事务?我的意思是,如果所有发送的消息都正常,那么将提交,或者如果一个或多个消息失败,那么所有消息都将回滚?我用这个sintax用producerapi发送消息
Using producer As IProducer(Of String, String) = New ProducerBuilder(Of String, String)(Producerconfig).Build()
For Each row As ClsRowObj In slice.enumValues(Of ClsRowObj)()
sMessage = row.toJsonStringMsg()
sKey = row.toJsonStringKey()
Try
' Note: Awaiting the asynchronous produce request
' below prevents flow of execution from proceeding
' until the acknowledgement from the broker is
' received (at the expense of low throughput).
Dim deliveryReport As DeliveryResult(Of String, String) = Await producer.ProduceAsync(sTopic, New Message(Of String, String) With {.Key = sKey, .Value = sMessage})
Catch ex As ProduceException(Of String, String)
console.writeline($"KO:{sMessage};{ex.Message} [{ex.[Error].Code}]")
End Try
Next
' Since we are producing synchronously, at this point there will be no messages
' in-flight And no delivery reports waiting to be acknowledged, so there Is no
' need to call producer.Flush before disposing the producer.
' il metodo flush() va usato in caso di chiamate asincrone (producer.Produce(sTopic, New Message(Of String, String) With {.Key = oMessage.incas.idmovadd, .Value = sMessage}))
' producer.Flush(TimeSpan.FromSeconds(10))
End Using
我需要这样的东西,但似乎不可能!?!?!?
dim bCommit as boolean = true
Using producer As IProducer(Of String, String) = New ProducerBuilder(Of String, String)(Producerconfig).Build()
producer.beginTransaction()
For Each row As ClsRowObj In slice.enumValues(Of ClsRowObj)()
sMessage = row.toJsonStringMsg()
sKey = row.toJsonStringKey()
Try
' Note: Awaiting the asynchronous produce request
' below prevents flow of execution from proceeding
' until the acknowledgement from the broker is
' received (at the expense of low throughput).
Dim deliveryReport As DeliveryResult(Of String, String) = Await producer.ProduceAsync(sTopic, New Message(Of String, String) With {.Key = sKey, .Value = sMessage})
Catch ex As ProduceException(Of String, String)
console.writeline($"KO:{sMessage};{ex.Message} [{ex.[Error].Code}]")
producer.rollBackTransaction()
bCommit = false
exit for
End Try
Next
' Since we are producing synchronously, at this point there will be no messages
' in-flight And no delivery reports waiting to be acknowledged, so there Is no
' need to call producer.Flush before disposing the producer.
' il metodo flush() va usato in caso di chiamate asincrone (producer.Produce(sTopic, New Message(Of String, String) With {.Key = oMessage.incas.idmovadd, .Value = sMessage}))
' producer.Flush(TimeSpan.FromSeconds(10))
if bCommit then
producer.commitTransaction()
end if
End Using
暂无答案!
目前还没有任何答案,快来回答吧!