返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - 整合Canal客户端监听MySQL数据库的变更教程(附样例)

作者:hangge | 2024-04-28 08:40
   Canal 是阿里巴巴开源的一款基于 MySQL 数据库的数据变更捕获与同步工具。通过整合 Canal 客户端,我们可以实现对 MySQL 数据库变更的实时监听,以便及时获取插入、更新和删除等操作的变更信息,从而进行相应的业务处理。下面我将通过样例进行演示。

1,准备工作

(1)首先我们需要安装好 Canal 的服务端,并对 MySQL 进行相关配置,具体参考我之前写的文章:

(2)由于本样例是要通过 Canal 客户端连接 Canal 服务端获取变更信息,因此服务端的 conf/canal.properties 配置文件中的 serverMode 配置需要设置为 tcp

2,添加依赖

Spring Boot 项目的 pom.xml 文件中添加 Canal 客户端的依赖:
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.7</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.7</version>
</dependency>

3,配置 Canal 连接信息

(1)在 application.properties 文件中配置 Canal 服务端的连接信息:
canal.server.host=192.168.60.9
canal.server.port=11111
canal.server.destination=example
canal.server.username=
canal.server.password=

(2)注意上面服务端的输出默认是 example,需要的话,我们同样可以在服务端的 conf/canal.properties 配置文件中修改。

4,创建 Canal监听器

    在 SpingBoot 项目中创建一个 Canal 监听器,代码如下。该监听器在应用启动时,通过 @PostConstruct 注解的 init 方法进行初始化,创建 Canal 连接器并开始监听 MySQL 数据库的数据变更。
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CanalListener {

  @Value("${canal.server.host}")
  private String canalHost;

  @Value("${canal.server.port}")
  private int canalPort;

  @Value("${canal.server.destination}")
  private String canalDestination;

  @Value("${canal.server.username}")
  private String canalUsername;

  @Value("${canal.server.password}")
  private String canalPassword;

  @PostConstruct
  public void init() {
    // 创建 Canal 连接器
    CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress(canalHost, canalPort),
            canalDestination,
            canalUsername,
            canalPassword
    );
    int batchSize = 1000;
    int emptyCount = 0;
    try {
      // 连接到 Canal 服务端
      connector.connect();
      // 订阅所有数据库和表
      connector.subscribe(".*\\..*");
      // 回滚确认,撤销对所有数据的确认,以重新获取数据
      connector.rollback();
      int totalEmptyCount = 120;
      while (emptyCount < totalEmptyCount) {
        // 获取指定数量的数据
        Message message = connector.getWithoutAck(batchSize);
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {
          // 空数据处理
          emptyCount++;
          System.out.println("empty count : " + emptyCount);
          try {
            // 休眠1秒,避免过于频繁的空数据查询
            Thread.sleep(1000);
          } catch (InterruptedException e) {
            // 异常处理
          }
        } else {
          // 重置空数据计数
          emptyCount = 0;
          // 处理获取的数据
          printEntry(message.getEntries());
        }

        // 提交确认,表示已经处理了这批数据
        connector.ack(batchId);
        // 处理失败时,可以调用 rollback 方法回滚数据
        // connector.rollback(batchId);
      }

      System.out.println("empty too many times, exit");
    } finally {
      // 断开连接
      connector.disconnect();
    }
  }

  // 处理数据变更事件
  private void printEntry(List<Entry> entries) {
    for (Entry entry : entries) {
      if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
              || entry.getEntryType() == EntryType.TRANSACTIONEND) {
        // 事务相关事件不处理
        continue;
      }

      RowChange rowChange = null;
      try {
        // 解析获取的数据
        rowChange = RowChange.parseFrom(entry.getStoreValue());
      } catch (Exception e) {
        // 异常处理
        throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
                + entry.toString(), e);
      }

      EventType eventType = rowChange.getEventType();
      System.out.println(
              String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
              entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
              entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
              eventType));

      for (RowData rowData : rowChange.getRowDatasList()) {
        if (eventType == EventType.DELETE) {
          // 处理删除事件
          printColumn(rowData.getBeforeColumnsList());
        } else if (eventType == EventType.INSERT) {
          // 处理插入事件
          printColumn(rowData.getAfterColumnsList());
        } else {
          // 处理更新事件
          System.out.println("-------> before");
          printColumn(rowData.getBeforeColumnsList());
          System.out.println("-------> after");
          printColumn(rowData.getAfterColumnsList());
        }
      }
    }
  }

  // 打印列信息
  private void printColumn(List<Column> columns) {
    for (Column column : columns) {
      System.out.println(column.getName() + " : " + column.getValue()
              + "    update=" + column.getUpdated());
    }
  }
}

5,运行测试

(1)启动程序后,可以看到控制台会持续打印出如下信息,代表当前数据库无变更数据。

(2)接着我们对 user 表数据进行操作,首先插入两条数据、接着更新一条数据、最后删除一条数据:
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (1, 'hangge', '1234567890');
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (2, 'baidu', '13362623365');
UPDATE `user` SET `name` = '航歌' WHERE `id` = 1;
DELETE FROM `user` WHERE `id` = 2;

(3)此时我们从控制台中看到如下信息,说明监听并捕获到了数据库的数据变更:
评论

全部评论(0)

回到顶部