SpringBoot - Kafka的集成与使用详解3(生产者1:指定topic、partition、key等)
作者:hangge | 2020-06-16 08:10
三、生产者1:指定 topic、partition、key 等
1,send() 方法
(1)在之前的文章中我们都是通过 KafkaTemplate 的 send() 方法指定一个 topic 发送消息,其实 send() 方法还支持其他参数,具体如下:
参数说明:
- topic:这里填写的是 Topic 的名字
- partition:这里填写的是分区的 id,其实也是就第几个分区,id 从 0 开始。表示指定发送到该分区中
- timestamp:时间戳,一般默认当前时间戳
- key:消息的键
- data:消息的数据
- ProducerRecord:消息对应的封装类,包含上述字段
- Message<?>:Spring 自带的 Message 封装类,包含消息及消息头
ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message);
(2)下面是一些简单的使用样例:
//发送带有时间戳的消息
kafkaTemplate.send("topic.hangge.demo", 0, System.currentTimeMillis(), "key1", "message");
//使用ProducerRecord发送消息
ProducerRecord record = new ProducerRecord("topic.hangge.demo", "message");
kafkaTemplate.send(record);
//使用Message发送消息
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.hangge.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);
2,sendDefault() 方法
(1)sendDefault() 方法和 send() 方法类似,只不过它不需要传入 topic(直接使用默认 topic),该方法支持如下几种形式:
参数说明:
- partition:这里填写的是分区的 id,其实也是就第几个分区,id 从 0 开始。表示指定发送到该分区中
- timestamp:时间戳,一般默认当前时间戳
- key:消息的键
- data:消息的数据
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
(2)要使用 sendDefault 发送消息,首先我们需要创建一个配置类并编写一个带有默认 Topic 参数(高亮部分)的 KafkaTemplate:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.producer.retries}")
private int retries;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Value("${spring.kafka.producer.batch-size}")
private int batchSize;
@Value("${spring.kafka.producer.properties.linger.ms}")
private int linger;
@Value("${spring.kafka.producer.buffer-memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<String, Object>(producerFactory());
template.setDefaultTopic("topic.hangge.default"); // 设置默认的 topic
return template;
}
}
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.sendDefault("send default test");
}
}
全部评论(0)