SpringBoot - Kafka的集成与使用详解5(生产者3:使用事务)
作者:hangge | 2020-06-18 09:10
Kafka 同数据库一样支持事务,当发生异常或者出现特定逻辑判断的时候可以进行回滚,确保消息监听器不会接收到一些错误的或者不需要的消息。Kafka 使用事务有两种方式,下面分别进行介绍。
五、生产者3:使用事务
1,使用 executeInTransaction 方法
(1)通常情况下,如果不声明事务,即使发送消息后面报错了,前面消息也已经发送成功:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic1", "test executeInTransaction");
throw new RuntimeException("fail");
}
}
(2)我们可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务。这种方式开启事务是不需要配置事务管理器的,也可以称为本地事务。
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
});
}
}
2,使用 @Transactional 注解方式
(1)如果要使用注解方式开启事务,首先我们需要配置 KafkaTransactionManager,这个类就是 Kafka 提供给我们的事务管理类,我们需要使用生产者工厂来创建这个事务管理类。
注意:我们需要在 producerFactory 中开启事务功能,并设置 TransactionIdPrefix,TransactionIdPrefix 是用来生成 Transactional.id 的前缀。
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.producer.retries}")
private int retries;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Value("${spring.kafka.producer.batch-size}")
private int batchSize;
@Value("${spring.kafka.producer.properties.linger.ms}")
private int linger;
@Value("${spring.kafka.producer.buffer-memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, Object> producerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());
factory.transactionCapable();
factory.setTransactionIdPrefix("tran-");
return factory;
}
@Bean
public KafkaTransactionManager transactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
}
(2)之后如果一个方法需要使用事务,我们只需要在该方法上添加 @Transactional 注解即可:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
@Transactional
public void test() {
kafkaTemplate.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
}
}
全部评论(0)