我正在尝试用java创建一个poc应用程序,以了解在使用kafka进行消息发布时如何在springcloudstream中进行事务管理。我尝试模拟的用例是接收消息的处理器。然后,它进行一些处理,并生成两条新消息,这些消息将发送到两个不同的主题。我希望能够处理作为单个事务发布这两条消息。因此,如果发布第二条消息失败,我想滚动(而不是提交)第一条消息。springcloudstream支持这样的用例吗?
我已经定好了 @Transactional
注解,我可以看到一个全局事务在消息传递给使用者之前启动。但是,当我试图通过 MessageChannel.send()
方法我可以看到一个新的本地事务在 KafkaProducerMessageHandler
'类' handleRequestMessage()
方法。这意味着消息的发送不参与全局事务。因此,如果在发布第一条消息之后抛出异常,则不会回滚该消息。全局事务被回滚,但由于第一条消息已经提交,因此实际上没有做任何事情。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: txn.
producer: # these apply to all producers that participate in the transaction
partition-key-extractor-name: partitionKeyExtractorStrategy
partition-selector-name: partitionSelectorStrategy
partition-count: 3
configuration:
acks: all
enable:
idempotence: true
retries: 10
bindings:
input-customer-data-change-topic:
consumer:
configuration:
isolation:
level: read_committed
enable-dlq: true
bindings:
input-customer-data-change-topic:
content-type: application/json
destination: com.fis.customer
group: com.fis.ec
consumer:
partitioned: true
max-attempts: 1
output-name-change-topic:
content-type: application/json
destination: com.fis.customer.name
output-email-change-topic:
content-type: application/json
destination: com.fis.customer.email
@SpringBootApplication
@EnableBinding(CustomerDataChangeStreams.class)
public class KafkaCloudStreamCustomerDemoApplication
{
public static void main(final String[] args)
{
SpringApplication.run(KafkaCloudStreamCustomerDemoApplication.class, args);
}
}
public interface CustomerDataChangeStreams
{
@Input("input-customer-data-change-topic")
SubscribableChannel inputCustomerDataChange();
@Output("output-email-change-topic")
MessageChannel outputEmailDataChange();
@Output("output-name-change-topic")
MessageChannel outputNameDataChange();
}
@Component
public class CustomerDataChangeListener
{
@Autowired
private CustomerDataChangeProcessor mService;
@StreamListener("input-customer-data-change-topic")
public Message<String> handleCustomerDataChangeMessages(
@Payload final ImmutableCustomerDetails customerDetails)
{
return mService.processMessage(customerDetails);
}
}
@Component
public class CustomerDataChangeProcessor
{
private final CustomerDataChangeStreams mStreams;
@Value("${spring.cloud.stream.bindings.output-email-change-topic.destination}")
private String mEmailChangeTopic;
@Value("${spring.cloud.stream.bindings.output-name-change-topic.destination}")
private String mNameChangeTopic;
public CustomerDataChangeProcessor(final CustomerDataChangeStreams streams)
{
mStreams = streams;
}
public void processMessage(final CustomerDetails customerDetails)
{
try
{
sendNameMessage(customerDetails);
sendEmailMessage(customerDetails);
}
catch (final JSONException ex)
{
LOGGER.error("Failed to send messages.", ex);
}
}
public void sendNameMessage(final CustomerDetails customerDetails)
throws JSONException
{
final JSONObject nameChangeDetails = new JSONObject();
nameChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
nameChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
nameChangeDetails.put(KafkaConst.FIRST_NAME_KEY, customerDetails.firstName());
nameChangeDetails.put(KafkaConst.LAST_NAME_KEY, customerDetails.lastName());
final String action = customerDetails.action();
nameChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel nameChangeMessageChannel = mStreams.outputNameDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(nameChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mNameChangeTopic).build());
if ("fail_name_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("Customer name failure!");
}
}
public void sendEmailMessage(final CustomerDetails customerDetails) throws JSONException
{
final JSONObject emailChangeDetails = new JSONObject();
emailChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
emailChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
emailChangeDetails.put(KafkaConst.EMAIL_ADDRESS_KEY, customerDetails.email());
final String action = customerDetails.action();
emailChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel emailChangeMessageChannel = mStreams.outputEmailDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(emailChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mEmailChangeTopic).build());
if ("fail_email_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("E-mail address failure!");
}
}
}
编辑
我们越来越近了。不再创建本地事务。但是,即使出现异常,全局事务仍然会被提交。据我所知,异常不会传播到 TransactionTemplate.execute()
方法。因此,事务被提交。看起来 MessageProducerSupport
班级 sendMessage()
方法“吞下”catch子句中的异常。如果定义了一个错误通道,则会向其发布一条消息,因此不会重新引发异常。我试着关掉错误频道( spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled = false
)但这并不能关闭它。因此,对于一个测试,我只需在调试器中将错误通道设置为null,以强制重新调用异常。看来是这样。但是,原始消息不断地被重新传递给初始消费者,即使我有 max-attempts
为该消费者设置为1。
1条答案
按热度按时间6uxekuva1#
请参阅文档。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
启用活页夹中的事务。请参阅kafka文档中的transaction.id和spring kafka文档中的transactions。启用事务时,将忽略单个生产者属性,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*属性。默认为空(无事务)
spring.cloud.stream.kafka.binder.transaction.producer.*
事务绑定器中生产者的全局生产者属性。请参阅spring.cloud.stream.kafka.binder.transaction.transactionidprefix和kafka producer属性以及所有绑定器支持的常规producer属性。默认值:请参见各个生产者属性。
必须配置共享全局生产者。
不添加
@Transactional
-在提交事务之前,容器将启动事务并向事务发送偏移量。如果侦听器抛出异常,事务将回滚,并且
DefaultAfterRollbackPostProcessor
将重新查找主题/分区,以便重新传递记录。编辑
绑定器的事务管理器的配置中有一个bug,导致输出绑定启动一个新的本地事务。
要解决这个问题,请使用以下容器定制器bean重新配置tm。。。
编辑2
不能使用活页夹的dlq支持,因为从容器的Angular 来看,传递是成功的。我们需要将异常传播到容器以强制回滚。所以,你需要把死字移到
AfterRollbackProcessor
相反。以下是我的完整测试课程:以及
和