SpringBoot - 整合Canal客户端监听MySQL数据库的变更教程(附样例)
作者:hangge | 2024-04-28 08:40
Canal 是阿里巴巴开源的一款基于 MySQL 数据库的数据变更捕获与同步工具。通过整合 Canal 客户端,我们可以实现对 MySQL 数据库变更的实时监听,以便及时获取插入、更新和删除等操作的变更信息,从而进行相应的业务处理。下面我将通过样例进行演示。
1,准备工作
(1)首先我们需要安装好 Canal 的服务端,并对 MySQL 进行相关配置,具体参考我之前写的文章:
(2)由于本样例是要通过 Canal 客户端连接 Canal 服务端获取变更信息,因此服务端的 conf/canal.properties 配置文件中的 serverMode 配置需要设置为 tcp:

2,添加依赖
在 Spring Boot 项目的 pom.xml 文件中添加 Canal 客户端的依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.7</version>
</dependency>
3,配置 Canal 连接信息
(1)在 application.properties 文件中配置 Canal 服务端的连接信息:
canal.server.host=192.168.60.9 canal.server.port=11111 canal.server.destination=example canal.server.username= canal.server.password=

4,创建 Canal监听器
在 SpingBoot 项目中创建一个 Canal 监听器,代码如下。该监听器在应用启动时,通过 @PostConstruct 注解的 init 方法进行初始化,创建 Canal 连接器并开始监听 MySQL 数据库的数据变更。
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class CanalListener {
@Value("${canal.server.host}")
private String canalHost;
@Value("${canal.server.port}")
private int canalPort;
@Value("${canal.server.destination}")
private String canalDestination;
@Value("${canal.server.username}")
private String canalUsername;
@Value("${canal.server.password}")
private String canalPassword;
@PostConstruct
public void init() {
// 创建 Canal 连接器
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalHost, canalPort),
canalDestination,
canalUsername,
canalPassword
);
int batchSize = 1000;
int emptyCount = 0;
try {
// 连接到 Canal 服务端
connector.connect();
// 订阅所有数据库和表
connector.subscribe(".*\\..*");
// 回滚确认,撤销对所有数据的确认,以重新获取数据
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 空数据处理
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
// 休眠1秒,避免过于频繁的空数据查询
Thread.sleep(1000);
} catch (InterruptedException e) {
// 异常处理
}
} else {
// 重置空数据计数
emptyCount = 0;
// 处理获取的数据
printEntry(message.getEntries());
}
// 提交确认,表示已经处理了这批数据
connector.ack(batchId);
// 处理失败时,可以调用 rollback 方法回滚数据
// connector.rollback(batchId);
}
System.out.println("empty too many times, exit");
} finally {
// 断开连接
connector.disconnect();
}
}
// 处理数据变更事件
private void printEntry(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
// 事务相关事件不处理
continue;
}
RowChange rowChange = null;
try {
// 解析获取的数据
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
// 异常处理
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+ entry.toString(), e);
}
EventType eventType = rowChange.getEventType();
System.out.println(
String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 处理删除事件
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 处理插入事件
printColumn(rowData.getAfterColumnsList());
} else {
// 处理更新事件
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
// 打印列信息
private void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue()
+ " update=" + column.getUpdated());
}
}
}
5,运行测试
(1)启动程序后,可以看到控制台会持续打印出如下信息,代表当前数据库无变更数据。
(3)此时我们从控制台中看到如下信息,说明监听并捕获到了数据库的数据变更:
(2)接着我们对 user 表数据进行操作,首先插入两条数据、接着更新一条数据、最后删除一条数据:
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (1, 'hangge', '1234567890'); INSERT INTO `user` (`id`, `name`, `phone`) VALUES (2, 'baidu', '13362623365'); UPDATE `user` SET `name` = '航歌' WHERE `id` = 1; DELETE FROM `user` WHERE `id` = 2;
(3)此时我们从控制台中看到如下信息,说明监听并捕获到了数据库的数据变更:
全部评论(0)