canal簡單安裝使用
1、數據庫配置
首先使用canal需要修改數據庫配置
[mysqld]
log-bin=mysql-bin # 開啟
binlog binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
創建canal數據庫用戶
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2、安裝canal
下載:https://github.com/alibaba/canal/releases
解壓(修改版本號):tar zxvf canal.deployer-1.1.4.tar.gz -C ./canal
配置開放服務器端口:11110、11111、11112
修改canal配置文件(這里設置了兩個instance,即兩個數據庫):
vi canal/conf/canal.properties
canal.destinations = example1,example2
配置instance:
cp -R canal/conf/example conf/example1
mv conf/example conf/example2
第一個數據庫配置
vi canal/conf/example1/instance.properties
canal.instance.master.address=32.1.2.140:3306
第二個數據庫配置
vi canal/conf/example2/instance.properties
canal.instance.master.address=32.1.2.140:3307
#如果需要新增一個instance,只需要修改canal.properties文件,并新增一個instance配置即可,無需重啟canal。
運行:
sh canal/bin/startup.sh # 查看日志
cat canal/logs/canal/canal
3、Java使用樣例
引入pom依賴,需要與安裝的canal版本一致
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> </dependencies>
示例代碼(異步打印兩個數據庫的修改內容):
package cn.spicybar.dblog; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) { new Thread(() -> initConnector("example1")).start(); new Thread(() -> initConnector("example2")).start(); } private static void initConnector(String destination) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("32.1.0.237", 11111), destination, "", ""); try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(1000); if (message.getId() != -1 && message.getEntries().size() > 0) { printEntry(message.getEntries()); } connector.ack(message.getId()); } } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entries) { for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); System.out.println(rowChange.getSql()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser error, data:" + entry.toString(), e); } } }