SpringBoot - 整合Canal客户端监听MySQL数据库的变更教程(附样例)
作者:hangge | 2024-04-28 08:40
Canal 是阿里巴巴开源的一款基于 MySQL 数据库的数据变更捕获与同步工具。通过整合 Canal 客户端,我们可以实现对 MySQL 数据库变更的实时监听,以便及时获取插入、更新和删除等操作的变更信息,从而进行相应的业务处理。下面我将通过样例进行演示。
1,准备工作
(1)首先我们需要安装好 Canal 的服务端,并对 MySQL 进行相关配置,具体参考我之前写的文章:
(2)由于本样例是要通过 Canal 客户端连接 Canal 服务端获取变更信息,因此服务端的 conf/canal.properties 配置文件中的 serverMode 配置需要设置为 tcp:
2,添加依赖
在 Spring Boot 项目的 pom.xml 文件中添加 Canal 客户端的依赖:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.7</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.7</version> </dependency>
3,配置 Canal 连接信息
(1)在 application.properties 文件中配置 Canal 服务端的连接信息:
canal.server.host=192.168.60.9 canal.server.port=11111 canal.server.destination=example canal.server.username= canal.server.password=
4,创建 Canal监听器
在 SpingBoot 项目中创建一个 Canal 监听器,代码如下。该监听器在应用启动时,通过 @PostConstruct 注解的 init 方法进行初始化,创建 Canal 连接器并开始监听 MySQL 数据库的数据变更。
import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class CanalListener { @Value("${canal.server.host}") private String canalHost; @Value("${canal.server.port}") private int canalPort; @Value("${canal.server.destination}") private String canalDestination; @Value("${canal.server.username}") private String canalUsername; @Value("${canal.server.password}") private String canalPassword; @PostConstruct public void init() { // 创建 Canal 连接器 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(canalHost, canalPort), canalDestination, canalUsername, canalPassword ); int batchSize = 1000; int emptyCount = 0; try { // 连接到 Canal 服务端 connector.connect(); // 订阅所有数据库和表 connector.subscribe(".*\\..*"); // 回滚确认,撤销对所有数据的确认,以重新获取数据 connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { // 获取指定数量的数据 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // 空数据处理 emptyCount++; System.out.println("empty count : " + emptyCount); try { // 休眠1秒,避免过于频繁的空数据查询 Thread.sleep(1000); } catch (InterruptedException e) { // 异常处理 } } else { // 重置空数据计数 emptyCount = 0; // 处理获取的数据 printEntry(message.getEntries()); } // 提交确认,表示已经处理了这批数据 connector.ack(batchId); // 处理失败时,可以调用 rollback 方法回滚数据 // connector.rollback(batchId); } System.out.println("empty too many times, exit"); } finally { // 断开连接 connector.disconnect(); } } // 处理数据变更事件 private void printEntry(List<Entry> entries) { for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { // 事务相关事件不处理 continue; } RowChange rowChange = null; try { // 解析获取的数据 rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { // 异常处理 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChange.getEventType(); System.out.println( String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { // 处理删除事件 printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { // 处理插入事件 printColumn(rowData.getAfterColumnsList()); } else { // 处理更新事件 System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } // 打印列信息 private void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
5,运行测试
(1)启动程序后,可以看到控制台会持续打印出如下信息,代表当前数据库无变更数据。
(3)此时我们从控制台中看到如下信息,说明监听并捕获到了数据库的数据变更:
(2)接着我们对 user 表数据进行操作,首先插入两条数据、接着更新一条数据、最后删除一条数据:
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (1, 'hangge', '1234567890'); INSERT INTO `user` (`id`, `name`, `phone`) VALUES (2, 'baidu', '13362623365'); UPDATE `user` SET `name` = '航歌' WHERE `id` = 1; DELETE FROM `user` WHERE `id` = 2;
(3)此时我们从控制台中看到如下信息,说明监听并捕获到了数据库的数据变更:
全部评论(0)