用kafka dotnet实现生产者事务

isr3a4wc  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(290)

有没有办法实现Kafka生产者原子事务?我的意思是,如果所有发送的消息都正常,那么将提交,或者如果一个或多个消息失败,那么所有消息都将回滚?我用这个sintax用producerapi发送消息

Using producer As IProducer(Of String, String) = New ProducerBuilder(Of String, String)(Producerconfig).Build()
    For Each row As ClsRowObj In slice.enumValues(Of ClsRowObj)()
        sMessage = row.toJsonStringMsg()
        sKey = row.toJsonStringKey()
            Try
                ' Note: Awaiting the asynchronous produce request
                ' below prevents flow of execution from proceeding
                ' until the acknowledgement from the broker is
                ' received (at the expense of low throughput).
                Dim deliveryReport As DeliveryResult(Of String, String) = Await producer.ProduceAsync(sTopic, New Message(Of String, String) With {.Key = sKey, .Value = sMessage})
            Catch ex As ProduceException(Of String, String)
                console.writeline($"KO:{sMessage};{ex.Message} [{ex.[Error].Code}]")
            End Try
    Next
    ' Since we are producing synchronously, at this point there will be no messages
    ' in-flight And no delivery reports waiting to be acknowledged, so there Is no
    ' need to call producer.Flush before disposing the producer.
    ' il metodo flush() va usato in caso di chiamate asincrone (producer.Produce(sTopic, New Message(Of String, String) With {.Key = oMessage.incas.idmovadd, .Value = sMessage}))
    ' producer.Flush(TimeSpan.FromSeconds(10))
End Using

我需要这样的东西,但似乎不可能!?!?!?

dim bCommit as boolean = true
Using producer As IProducer(Of String, String) = New ProducerBuilder(Of String, String)(Producerconfig).Build()
    producer.beginTransaction()
        For Each row As ClsRowObj In slice.enumValues(Of ClsRowObj)()
            sMessage = row.toJsonStringMsg()
            sKey = row.toJsonStringKey()
                Try
                    ' Note: Awaiting the asynchronous produce request
                    ' below prevents flow of execution from proceeding
                    ' until the acknowledgement from the broker is
                    ' received (at the expense of low throughput).
                    Dim deliveryReport As DeliveryResult(Of String, String) = Await producer.ProduceAsync(sTopic, New Message(Of String, String) With {.Key = sKey, .Value = sMessage})
                Catch ex As ProduceException(Of String, String)
                    console.writeline($"KO:{sMessage};{ex.Message} [{ex.[Error].Code}]")
                    producer.rollBackTransaction()
                    bCommit = false
                    exit for
                End Try
        Next
        ' Since we are producing synchronously, at this point there will be no messages
        ' in-flight And no delivery reports waiting to be acknowledged, so there Is no
        ' need to call producer.Flush before disposing the producer.
        ' il metodo flush() va usato in caso di chiamate asincrone (producer.Produce(sTopic, New Message(Of String, String) With {.Key = oMessage.incas.idmovadd, .Value = sMessage}))
        ' producer.Flush(TimeSpan.FromSeconds(10))
        if bCommit then
            producer.commitTransaction()    
        end if
End Using

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题