公司涉及到业务团队与IM团队的数据同步,为了解耦避免修改我们这边的代码,同时为了性能考虑,打算用canal来处理。
具体步骤都是参考了canal官方的文档
1、安装canal-admin 一个canal-admin是可以管理多个canal集群的。所以我把canal-admin安装在了base
命名空间下。
配置好canal-manager 数据库后,安装admin到base命名空间下。
1 2 3 4 # kubens recircle-industry-base-system # helm install canal-admin -f ./admin-values.yaml ./canal-admin
我们现在是每个命名空间一个canal集群,dev、test、pre分别对应了三个环境。
2、安装zookeeper集群 1 2 3 4 # kubens recircle-industry-platform-preview # helm install canal-zookeeper oci://registry-1.docker.io/bitnamicharts/zookeeper
安装完成后,把zookeeper的k8s service地址填到canal-admin中。
然后可以针对这个集群添加主配置,这个配置就是集群下所有canal-server的instance默认的配置,在这里可以配置canal-admin地址、记录表结构发生变化的tsdb的地址、消息队列的地址:
canal.ip =canal.register.ip =canal.port = 11111 canal.metrics.pull.port = 11112 canal.user = admin canal.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.admin.manager = canal-admin.recircle-industry-base-system:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.zkServers = canal-zookeeper.recircle-industry-platform-preview:2181 canal.zookeeper.flush.period = 1000 canal.withoutNetty = false canal.serverMode = rocketMQ canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024 canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true canal.instance.detecting.enable = false canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false canal.instance.transaction.size = 1024 canal.instance.fallbackIntervalInSeconds = 60 canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB canal.instance.get.ddl.isolation = false canal.instance.parser.parallel = true canal.instance.parser.parallelBufferSize = 256 canal.instance.tsdb.enable = true canal.instance.tsdb.url = jdbc:mysql://db.irecircle.com:3306/canal_tsdb?useUnicode=true&characterEncoding=UTF-8&useSSL=false canal.instance.tsdb.dbUsername = root canal.instance.tsdb.dbPassword = 123456 canal.instance.tsdb.snapshot.interval = 24 canal.instance.tsdb.snapshot.expire = 360 canal.destinations =canal.conf.dir = ../conf canal.auto.scan = true canal.auto.scan.interval = 5 canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = manager canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} canal.instance.global.spring.xml = classpath:spring/default-instance.xml canal.aliyun.accessKey =canal.aliyun.secretKey =canal.aliyun.uid =canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 kafka.bootstrap.servers = 127.0.0.1:6667 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.kerberos.enable = false kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf rocketmq.producer.group = preview-canal rocketmq.enable.message.trace = false rocketmq.customized.trace.topic =rocketmq.namespace =rocketmq.namesrv.addr = 192.168.110.46:9876 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag =rabbitmq.host =rabbitmq.virtual.host =rabbitmq.exchange =rabbitmq.username =rabbitmq.password =rabbitmq.deliveryMode =pulsarmq.serverUrl =pulsarmq.roleToken =pulsarmq.topicTenantPrefix =
3、部署canal-server 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 server : config : | canal.port = 11111 canal.metrics.pull.port = 11112 canal.register.ip = canal.admin.manager = canal-admin.recircle-industry-base-system:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.admin.register.auto = true canal.admin.register.cluster = pre # 配置集群
部署canal-server
1 helm install canal-server -f ./server-values.yaml ./canal-server
这个canal-server是可以部署多个replica的
instance 实例创建 1 2 3 4 5 6 7 8 9 10 canal.instance.master.address =127.0.0.1:3306 canal.instance.dbUsername =**** canal.instance.dbPassword =**** canal.instance.filter.regex =.*\\..* canal.mq.topic =你的Routing Key
本作品采用 知识共享署名 4.0 国际许可协议 进行许可。
转载时请注明原文链接 :https://blog.hufeifei.cn/2025/10/DB/canal-in-k8s/