我读了很多关于这个主题的问题和答案,但我不能解决我的问题。
我用kafka和spring数据jdbc初始化了一个springboot项目。我想做的是
配置kafka jdbc连接器,以便将postgresql数据库中的记录更改推送到kafka主题中
设置一个kafka使用者,以便通过将其插入另一个postgresql db来使用推送到主题中的记录。
对于第1点,一切正常。对于第二点,我遇到了一些问题。
这就是项目的组织方式
com.migration
- MigrationApplication.java
com.migration.config
- KafkaConsumerConfig.java
com.migration.db
- JDBCConfig.java
- RecordRepository.java
com.migration.listener
- MessageListener.java
com.migration.model
- Record.java
- AbstractRecord.java
- PostgresRecord.java
这就是 MessageListener
班
@EnableJdbcRepositories("com.migration.db")
@Transactional
@Configuration
public class MessageListener {
@Autowired
private RecordRepository repository;
@KafkaListener(topics={"author"}, groupId = "migrator", containerFactory = "migratorKafkaListenerContainerFactory")
public void listenGroupMigrator(Record record) {
repository.insert(message);
throw new RuntimeException();
}
我认为很清楚,它设置了一个kafka消费者,以便收听“author”主题,并通过将其插入db来消费记录。
如你所见,里面 listenGroupMigrator()
方法在插入到记录的db中时执行,然后抛出 RuntimeException
因为我在检查 @Transactional
工作,如果执行回滚。
但如果不是,则不会执行回滚,即使用 @Transactional
.
为了完整性,这些是其他类 RecordRepository
班
@Repository
public class RecordRepository {
public RecordRepository() {}
public void insert(Record record) {
JDBCConfig jdbcConfig = new JDBCConfig();
SimpleJdbcInsert messageInsert = new SimpleJdbcInsert(jdbcConfig.postgresDataSource());
messageInsert.withTableName(record.tableName()).execute(record.content());
}
}
``` `JDBCConfig` 班
@Configuration
public class JDBCConfig {
@Bean
public DataSource postgresDataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.postgresql.Driver");
dataSource.setUrl("jdbc:postgresql://localhost:5432/db");
dataSource.setUsername("postgres");
dataSource.setPassword("root");
return dataSource;
}
}
``` KafkaConsumerConfig
班级:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrap-server}")
private String bootstrapServer;
private <T extends Record> ConsumerFactory<String, T> consumerFactory(String groupId, Class<T> clazz) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(clazz));
}
private <T extends Record> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory(String groupId, Class<T> clazz) {
ConcurrentKafkaListenerContainerFactory<String, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId, clazz));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PostgresRecord> migratorKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("migrator", PostgresRecord.class);
}
}
``` `MigrationApplication` 班
@SpringBootApplication
public class MigrationApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MigrationApplication.class, args);
MessageListener listener = context.getBean(MessageListener.class);
}
}
我怎么才能做这个 `listenGroupMigrator` 事务性方法?
暂无答案!
目前还没有任何答案,快来回答吧!