基于Binlog的实时同步功能——debezium、canel、databus技术选型

大厂这段时间学到的最有价值的两个技术除了大数据,另一个就是基于CDC和消息队列的实时同步技术。

去年的一篇文章大致地讲了我对MQ的一些认识,事实上Kafka在内的现代MQ,功能远不止这些。后面整理好自己的思路,肯定会再写一篇文章来讲讲。这篇文章的主角就是与MQ息息相关的CDC技术。

#1. CDC技术

CDC全称叫:change data capture,是一种基于数据库数据变更的事件型软件设计模式。

比如有一张订单表trade,订单每一次变更录入到一张trade_change的队列表。然后另外一个调度线程可以消费trade_change这张队列表来做一些数据统计,如每日的付款用户统计、每日的下单用户统计等。

这就是我毕业入职的第一家公司的报表统计逻辑。这个设计在订单量小的时候是看不出问题的,而一旦某一时刻订单量增多。基于MySQL的队列表由于B+树的写入吞吐量不够,导致MySQL CPU经常飙升。比如双十一,618这样的大促,程序员就得在颤颤巍巍中度过。

其次,从MySQL同步到ElaticSearch是根据last_modify_time时间扫索引增量同步的,这就要求表上必须创建last_modify_time索引,Scheduler一多也会无形地增加MySQL的读取负担。

B+的写入性能肯定是不如直接顺序写文件的,B+树的本质就是牺牲写性能,换取磁盘上的随机读的查找结构,所以大部分数据库都会设计Buffer Pool来管理B+树脏页,以避免频繁的随机IO。
同时为了防止Buffer数据丢失同时为了保证事务的ACID,所以就有了Redo-log来进行崩溃恢复,Undo-log来做未提交事务的撤销。这些日志都是顺序写入,远比B+树的随机写性能高。

database architecture

#2. 基于Binlog的CDC

Binlog是MySQL 3.23.14引进的,它包含所有的描述数据库修改的事件——DML(增删改)、DDL(表结构定义与修改)操作。

MySQL architecture

与InnoDB中的redo-logundo-log不同,binlog和slow_query_log一样是server层的日志,所以InnoDB和MyISAM等各种存储引擎的数据修改都会记录到这个日志中。

MySQL拥有分层架构,支持可插拔的存储引擎,所以服务层的binlog与InnoDB引擎的redo-log是不同的两个事物,这也是为什么MySQL支持以STATEMENT格式直接将sql语句存入binlog。而像PostgreSQL这样的数据库,WAL日志除了作为redo-log用于保证事务的持久性外,WAL日志在Replica过程中也扮演着与MySQL的binlog相同的角色, 但是需要用Logical Decoding将WAL日志解析成数据流或SQL语句。

CDC architecture

对于CDC的架构设计,在大数据量的分布式场景下,我们都是使用binlog来做事件源。

一方面,将binlog复制到Kafka,再由Kafka下游的消费者处理这些事件不影响数据库的核心业务,可以降低系统的耦合度;

另一方面,binlog和Kafka都是基于日志的顺序写入,Kafka的吞吐量远比B+树高,系统的整体性能也能得到改善。

目前基于binlog的CDC技术已经很成熟了,在github上也有很多实现,通过Change Data Capturereplicationbinlog等关键词可以搜索到相关项目。在此列举一下:

ProjectLanguageDescription
alibaba/CanalJava阿里巴巴 MySQL binlog 增量订阅&消费组件
debezium/debeziumJavaDebezium is an open source distributed platform for change data capture. Replicates from MySQL to Kafka. Uses mysql-binlog-connector-java. Kafka Connector. A funded project supported by Redhat with employees working on it full time.
linkedin/databusJavaPrecursor to Kafka. Reads from MySQL and Oracle, and replicates to its own log structure. In production use at LinkedIn. No Kafka integration. Uses Open Replicator.
zendesk/MaxwellJavaReads MySQL event stream, output events as JSON. Parses ALTER/CREATE TABLE/etc statements to keep schema in sync. Written in java. Well maintained.
noplay/python-mysql-replicationPythonPure python library that parses MySQL binary logs and lets you process the replication events. Basically, the python equivalent of mysql-binlog-connector-java
shyiko/mysql-binlog-connector-javaJavaLibrary that parses MySQL binary logs and calls your code to process them. Fork/rewrite of Open Replicator. Has tests.
confluentinc/bottledwater-pgCChange data capture from PostgreSQL into Kafka
uber/storagetapperGoStorageTapper is a scalable realtime MySQL change data streaming, logical backup and logical replication service
moiot/gravityGoA Data Replication Center
whitesock/open-replicatorJavaOpen Replicator is a high performance MySQL binlog parser written in Java. It unfolds the possibilities that you can parse, filter and broadcast the binlog events in a real time manner.
mardambey/mypipeScalaReads MySQL event stream, and emits events corresponding to INSERTs, DELETEs, UPDATEs. Written in Scala. Emits Avro to Kafka.
Yelp/mysql_streamerPythonMySQLStreamer is a database change data capture and publish system. It’s responsible for capturing each individual database change, enveloping them into messages and publishing to Kafka.
actiontech/dtleGoDistributed Data Transfer Service for MySQL
krowinski/php-mysql-replicationPHPPure PHP Implementation of MySQL replication protocol. This allow you to receive event like insert, update, delete with their data and raw SQL queries.
dianping/pumaJava本系统还会实现数据库同步(同构和异构),以满足数据库冗余备份,数据迁移的需求。
JarvusInnovations/LapidusJavascriptStreams data from MySQL, PostgreSQL and MongoDB as newline delimited JSON. Can be run as a daemon or included as a Node.js module.

这里只讨论Java语言的几个实现。首先whitesock/open-replicatorshyiko/mysql-binlog-connector-java是专门用来解析MySQL binlog的库,后者也是在前者的基础上重构的。debezium/debeziumlinkedin/databuszendesk/Maxwell三个中间件binlog解析都是基于这两个库。

#3. Canal vs. Debezium vs. databus vs. MaxWell

1、alibaba/Canal

优点:

  • 阿里开源,有大厂实践背书
  • 资料大都是中文的,方便学习

缺点:

  • 定位于MySQL binlog解析,所以只能支持MySQL数据库的CDC
  • Github上项目活跃度很一般,issue堆积了太多,13、14年的问题都还没解决。

2、debezium/debezium

优点:

  • Rethat开源,专干开源的国际大厂背书
  • 支持MySQL、PostgreSQL、Oracle、SqlServer、MongoDB主流数据库
  • 文档详细,资料齐全
  • 社区完善,在Gitter上有专门的问题讨论区。
  • 与Kafka很好集成,可作为Kafka Connector插件使用,embed模式支持嵌入自己的程序方便控制,也支持Server模式单独运行。
  • 支持SMT消息体转换,OpenTracing分布式链路追踪等集成功能

缺点:

  • 文档大多数是英文的,得多花点耐心

有意思的是阿里开源的Flink流处理系统也是使用Debezium来做CDC,当然它还支持Canel、Maxwell

Kafka创始人创办的confluentinc刚开始开源了bottledwater-pg,最后也投入了debezium的怀抱,有官方的认可。

3、linkedin/databus

优点:

  • 国际大厂领英开源
  • 支持MySQL和Oracle

缺点:

  • 项目已经很久没有人维护了
  • 文档也很一般
  • 暂时不支持Kafka集成,只能用Databus Client消费binlog。

Kafka最早是Jay Kreps在领英创建并开源的,可能是Jay Kreps觉得Kafka在大数据领域大有可图,所以就带着Linkedin的几个工程师一起创立了Confluent​专注于Kafka生态的开发与维护。

在Kafka文档Log-Compact一节可以看到这段话:

This functionality is inspired by one of LinkedIn’s oldest and most successful pieces of infrastructure—a database changelog caching service called Databus. Unlike most log-structured storage systems Kafka is built for subscription and organizes data for fast linear reads and writes. Unlike Databus, Kafka acts as a source-of-truth store so it is useful even in situations where the upstream data source would not otherwise be replayable.

可以看出Databus是Linkedin非常老的一个基础服务,Kafka的Log Compact的一些设计也源自于Databus。

4、zendesk/maxwell

优点:

  • 相当简单,下载下来,简单进行配置就能运行
  • 文档相对来说,还算齐全
  • 支持Kafka、RabbitMQ、Redis等队列

缺点:

  • 文档是英文的,不过好在maxwell相对简单。

  • 没啥明显缺点。非要说个缺点,就是和前三者比身份不够显赫,zendesk这家美国公司没怎么听过。

综合下来,Debezium是最佳选择。

#4. Debezimu-MySQL的配置

要使用debezium需要预先对mysql服务进行配置

#4.1. MySQL配置

1)创建单独的用户,并授予debezium需要的权限

1
2
3
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;

MySQL提供的权限:https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html

debezium需要几个权限的作用:

KeywordDescription
SELECTSELECT查询权限。只被用在初始化阶段。
RELOAD执行 FLUSH 语句清除重新加载内部缓存。只被用在初始化阶段。
SHOW DATABASES执行 SHOW DATABASE 语句。只被用在初始化阶段。
REPLICATION SLAVE读取MySQL binlog。
REPLICATION CLIENT执行SHOW MASTER STATUSSHOW SLAVE STATUSSHOW BINARY LOGS等语句。

2)开启MySQL服务的binlog功能

1
2
3
4
5
server-id         = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10

各项配置的作用:

PropertyDescription
server-id在MySQL集群中每个server和replication的 server-id 必须是唯一的。Debezium是作为MySQL的replication,启动后也会分配一个server-id给debezium-connector。
log_binbinlog文件的前缀
binlog_formatbinlog-format 必须设置成ROW模式。
binlog_row_imagebinlog_row_image 必须设置成FULLROW模式下binlog需要记录所有的列。
expire_logs_daysbinlog的过期时间。默认位 0, 意味着不会自动删除。这个值可根据自己的环境需求进行设置。

mysql的binlog有三种模式
STATEMENT模式只记录SQL语句,从节点通过执行同步过来的sql在从库中再执行一遍。STATEMENT模式的问题是有些语句(比如update t set num=num+1 limit 1)可能会产生不一致性,而且STATEMENT模式下sql发给异构系统将会无法使用。
ROW模式会直接复制修改的数据行,但是有可能会导致日志量过大,比如执行一条update t set num=num+1,修改了一万行就会有一万行日志,肯定没有STATEMENT模式来的快。
MIXED模式,则将两者结合,默认情况下使用statement,某些情况会切换为基于行的复制。
具体可以参考这个回答

还有几项可选配置项:

#4.2. 准备Kafka环境,在Kafka-connect中安装Debezium

Kafka需要依赖Zookeeper管理集群,所以还需要准备zookeeper环境。

1)下载Debezium:https://debezium.io/releases/

2)配置Kafka-connect插件路径,并将Debezimu插件解压到该目录

1
plugin.path=/kafka/connect

3)启动Kafka-connect进程:

Kafka-connect可以用单机版(standalone)和分布式版(distributed)两种启动方式:

  • standalone模式下,启动时直接提供properties文件来创建Connector任务。
  • distributed模式下,提供REST接口对Connector任务进行增删改查。

#4.3. Debezium的基础配置

distributed模式下可以,调用POST /connectors接口创建Debezium的Connector任务,任务的基本配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.99.100",
"database.port": "3306",
"database.user": "debezium-user",
"database.password": "debezium-user-pw",
"database.server.id": "184054",
"database.server.name": "fullfillment",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.fullfillment",
"include.schema.changes": "true"
}
}

这个配置主要是数据库的用户名密码,需要同步的数据库和相关数据表,以及kafka地址和数据库schema变更存储的topic。

Debezium-Connector的所有配置:https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-connector-properties

#5. binlog解析的难点与Debezium工作原理

binlog的ROW模式下类似于csv是没有shema的,我们将row_image设置成full模式,不管update操作只涉及几列,都会把完整的行数据写入到binlog。

#5.1. 表结构随时都会修改,需要解析ddl并维护一份schema用于事件的生成

数据库客户端查询数据库的时候,客户端拿到的都是数据库当前的schema。因为schema随时可以改变,这意味着主从备份的时候,debezium不能只使用当前的schema,因为debezium可能正在处理较旧的事件。

比如,有一张trade_info表,在某个时间点T添加了payment字段,在T之前的binlog是没有payment字段的,T之后的binlog才有payment。那Debezium生成事件也应该是在T之前有payment字段,T之后没有payment字段。

MySQL在binlog中不仅包含行级修改,还包括了数据库的DDL语句。当Debezium的Connector读取binlog并遇到这些DDL语句时,它会解析这些DDL并更新内存中每个表shema。Debezium使用这个shema就能标识每次增删改操作的结构从而生成事件。

#5.2. 内存里的schema维护存在问题

崩溃或正常重启后,怎么还原schema,如果使用数据库当前的schema会怎样呢:

  1. 假如在T0~T1的时间内,表结构A发生过增加列的DDL操作,那在处理T0时间段A表的binlog时,拿到的表结构为T1的schema,就会出现列不匹配的情况. 比如之前的异常: column size is not match for table: xx , 12 vs 13
  2. 假如在T0~T1发生了增加 C1列、删除了C2列,此时拿到的列的总数还是和T0时保持一致,但是对应的列会错位
  3. 假如在T0~T1发生了drop table的DDL,此时拿表结构时会出现无法找到表的异常,一直阻塞整个binlog处理,比如not found [xx] in db

很明显,不能直接查数据库当前的schema来为之前的binlog生成事件。Debezium和Canal都有自己的解决方案:

Debezium会把所有DDL语句以及DDL在binlog的位置单独存在一个history的topic中,这个topic可以用database.history.kafka.topic进行配置。
当Debezium的Connector崩溃或正常停止重启后,Connector重新从原来的位置读取binlog。但是存在内存里的schema已经没有了,所以它会重新解析history中的DDL语句重建表结构。

alibaba/canal提供了TableMetaTSDB的功能可以存储表结构的时序数据。

#5.3. Kafka无法保证多个partition的消费顺序

因为Debezium会重新解析history topic的DDL语句,我们希望DDL语句能按正常顺序解析,但是Kafka无法保证多个partition的消费顺序,所以history的topic的partition个数必须设置成1。

#5.4. 消费DDL

Debezium不希望用户直接使用history topic。因为里面包含了binlog中的所有ddl语句。

如果用户想要消费自己关心的表的DDL语句,Debezium提供了schema change topic,这个topic名字被命名为serverName,这个serverName通过database.server.name配置。

#6. Debezium踩坑记录

debezium配置起来还是比较简单的,但是这么复杂的项目,坑还是比较多的。

#6.1. 关闭快照初始化

Debezium的Connector第一次启动时,会给你的数据库执行一次快照初始化

因为对于老项目,早期的binlog肯定已经被删掉了,这个时候Debezium会帮你把数据库的所有数据都写到Kafka里,这次快照之后的增删改操作通过解析binlog写入kafka。这也是为什么Debezium需要获取数据库SELECT权限的原因。

但是快照读有这么几个问题:

  • 在执行快照初始化过程中,Connector重启或者Kafka-connect Rebalance,重启后Debezium会重新初始化快照。因为Debezium的快照是通过SELECT * FROM table扫描全表实现的,没有记录进度,非常粗暴。
  • 为了防止快照初始化过程中表的schema会变更,快照初始化前会获取全局读锁。

可以通过snapshot.locking.mode属性配置是否获取全局读锁,snapshot.locking.mode=none即可关闭。

snapshot只适合在备份从库上执行,否则可能会影响正常用户的使用,通过snapshot.mode可以对初始化进行配置,这个选项支持以下几个配置值:

initial (default)- 只有当binlog的offset没有记录的时候才会执行一次快照初始化。

when_needed - 有需要时就会执行,比如第一次offset没有记录,或者Connector停了很久早期的binlog被删掉了,当前的offset已经不可用了,或者GTID对不上的时候。

never - 从不执行初始化。第一次启动Connector时就从binlog头部开始读取。需要注意,这种配置需要binlog包含所有的历史记录。

schema_only - Connector初始化时只读取表的schame而不读取数据。如果你只需要Connector启动后的数据库变更,那这个配置很有用。

schema_only_recovery - 用于恢复重启后丢失的schema,但是这个只能用在自上次提交binlog-offset后,schema没有发生任何变更。

initial_only - 这个配置在文档里没有,代码里可以看到,这个是只用来执行快照的。

用一句话总结一下:initial先全量后增量同步,schema_onlynever是只增量同步,initial_only是只全量同步。

#6.2. 修改topic

Debezium默认的行为是将一张表上的INSERTUPDATEDELETE操作记录到一个topic。Topic命名规则是<serverName>.<databaseName>.<tableName>

如果进行分库了,比如server0上有db01db02两个逻辑库,server1上有db11db12两个逻辑库,这四个逻辑库上都有一张order表。那此时就会有4个topic。

如果我们想把它们路由到同一个topic上,就需要用到Kafka-Connect提供的SMT功能了:

1
2
3
4
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3

Kafka-Connect提供了一个RegexRouterTimestampRouterMessageTimestampRouter几个SMT让我们修改数据存入的topic。这里的RegexRouter,允许我们用正则表达式来对Debezium默认的topic进行修改。

#6.3. Decimal数据的处理

对于MySQL中的decimal类型的数据,Java里会转成BigDecimal,但是以json格式存入kafka的时候就会丢失精度。

毕竟json出自JS,JS中只支持number数值类型,对应到Java就是double类型。

Debezium支持decimal.handling.mode选项可以将decimal配置成string类型。

#6.4. 时间类型数据的处理

Debezium底层的binlog解析用的是shyiko/mysql-binlog-connector-java。这中间做了很多转换:

mysql(Asia/Shanghai)binlog-connectordebeziumdebezium schema
date (2021-01-28)LocalDate (2021-01-28)Integer (18655)io.debezium.time.Date
time (17:29:04)Duration (PT17H29M4S)Long (62944000000)io.debezium.time.MicroTime
timestamp (2021-01-28 17:29:04)ZonedDateTime (2021-01-28T09:29:04Z)String (2021-01-28T09:29:04Z)io.debezium.time.ZonedTimestamp
datetime (2021-01-28 17:29:04)LocalDateTime (2021-01-28T17:29:04)Long (1611854944000)io.debezium.time.Timestamp

date类型,最后在Debezium中会调用LocalDate.toEpochDay转成了基于1970年的天数。

time类型,在binlog解析库中,被转成了Duration,在Debezium中最后被转成了毫秒值。

timestamp类型,最后在Debezium中被转成了一个ISO格式的字符串,但是时区默认是UTC时区。

datetime类型,最后在Debezium中被转成了一个long类型,时区是写死的UTC时区。

文档里有MySQL时间类型与存入Kafka类型的映射表

总之,Debezium时间的处理混乱不堪。所以我为Debezium写了一个datetime-converter的补丁可以将这四种类型转成字符串。配置如下:

1
2
3
4
5
6
7
converters=datetime
datetime.type=com.darcytech.debezium.converter.MySqlDateTimeConverter
datetime.format.date=yyyy-MM-dd
datetime.format.time=HH:mm:ss
datetime.format.datetime=yyyy-MM-dd HH:mm:ss
datetime.format.timestamp=yyyy-MM-dd HH:mm:ss
datetime.format.timestamp.zone=UTC+8

#6.5. 墓碑事件

Debezium会生成5种事件:

  • create events:对应MySQL种的INSERT语句。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    { 
    "op": "c",
    "ts_ms": 1465491411815,
    "before": null,
    "after": {
    "id": 1004,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "source": {
    "version": "1.4.2.Final",
    "connector": "mysql",
    "name": "mysql-server-1",
    "ts_ms": 0,
    "snapshot": false,
    "db": "inventory",
    "table": "customers",
    "server_id": 0,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "thread": 7,
    "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
    }

    此时payload种的before字段为null,after字段为新增的记录值。

  • update events:对应MySQL种的UPDATE语句。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    {
    "before": {
    "id": 1004,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "after": {
    "id": 1004,
    "first_name": "Anne Marie",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "source": {
    "version": "1.4.2.Final",
    "name": "mysql-server-1",
    "connector": "mysql",
    "name": "mysql-server-1",
    "ts_ms": 1465581029100,
    "snapshot": false,
    "db": "inventory",
    "table": "customers",
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 484,
    "row": 0,
    "thread": 7,
    "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u",
    "ts_ms": 1465581029523
    }

    此时payload中,before为更新前的数据,after为更新后的数据。

  • Primary key updates:修改主键的操作,会生成一个DELETE事件和CREATE事件:

    • DELETE 事件会有 __debezium.newkey 的消息头。这个值是更新后的新主键。
    • CREATE 事件会有 __debezium.oldkey 的消息头。这个值是更新前的老主键。
  • delete events:对应MySQL的DELTE语句。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    {
    "schema": { ... },
    "payload": {
    "before": {
    "id": 1004,
    "first_name": "Anne Marie",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "after": null,
    "source": {
    "version": "1.5.0.Beta2",
    "connector": "mysql",
    "name": "mysql-server-1",
    "ts_ms": 1465581902300,
    "snapshot": false,
    "db": "inventory",
    "table": "customers",
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 805,
    "row": 0,
    "thread": 7,
    "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d",
    "ts_ms": 1465581902461
    }
    }

    此时payload中,before为删除前的数据,after为null。

  • Tombstone events:Debezium会为删除操作生成一条key与DELETE事件相同、value为null的空消息(墓碑事件)。

    墓碑事件主要用于Kafka的compact——Kafka会删除具有相同key的早期事件。但是要让Kafka删除所有具有相同key的消息,需要将消息指设置成null。

需要特别注意,墓碑事件的消息value为null,需要为这个事件做特殊处理。

#6.6. 禁用Kafka-Connect的Schema配置

Kafka-Connect为了保证每条消息是可以自我描述的,所以都会带schema。如果我们使用了JsonConverter进行序列化,默认情况下,kafka中的消息格式是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"schema": { /* ... */ },
"payload": {
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
}

这里面的schema会包含下面payload里每个字段的类型解释,会导致Kafka中存储的消息非常臃肿。可以在Kafka-Connect中将Key和Value的schema禁用掉:

1
2
3
4
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

更好的解决方案是使用中心化的Schema Registry。Debezium也推荐使用这种方式。

schema registry

在github搜索schema registry关键词查找相关项目。Debezium在文档中推荐Apicurio API and Schema RegistryConfluent Schema Registry这两种SchemaRegistry。

#6.7. 对Debezium生成的消息进行处理

没有shema的时候,Debezium默认生成的数据格式是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}

消息体中before表示变更前的数据,after表示变更后的数据,source表示来源于哪个数据库、哪张表、哪个事务(GTID)。

为了方便与其他Connector集成,比如让kafka-connect-jdbc把消息都写到另一个数据库中。那这个时候我们只想要after里面的数据了。

Debezium提供了一个Event-Flat的SMT,我们只需要和上面的RegexRouter一样配置一下就可以了:

1
2
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

那如果是删除操作呢,删除操作会生成两个事件,一个delete事件有before没有after,还有一个和delete事件key相同的墓碑事件消息体为null。ExtractNewRecordState可以配置怎么处理delete记录:

1
2
3
4
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=drop

delete.handling.mode指定delete记录的处理模式,默认为drop也就是delete记录将会被ExtractNewRecordState丢弃。drop.tombstones指定要不要丢弃墓碑事件。

更多配置可以参考官方文档

#6.8. kafka-connect的坑

kafka broker本身有个配置auto.create.topics.enable默认为true——当发送消息到一个不存在的topic时,kafka会自动创建这个topic,这些自动创建的topic会使用num.partitionsdefault.replication.factor指定的partition数和replicas数创建topic。生产环境一般是不建议使用kafka broker中的自动创建主题的,因为这可能会带来很大的维护成本,我们希望不同情况使用不同的主题配置。

另外,kafka-connect启动时默认会创建三个connect内部使用的topic,这三个topic名字由config.storage.topicoffset.storage.topicstatus.storage.topic三个配置指定,它们分别存储connector的配置和offset以及当前的状态。

如果想要对这三个自动创建的topic进行一些配置,可以参考connect的文档

如果你是手动创建需要注意:

config的partition必须为1

offset和kafka内建的__consumer_offsets类似,如果要支持更大的kafka-connect集群,可以把partition设大一点。

这三个topic的cleanup.policy都必须设置成compacted模式。

如果是source connector内部要自动创建topic,可以使用connector的一些配置,具体可以参考:

Configuring Auto Topic Creation for Source Connectors

Customization of Kafka Connect automatic topic creation

Refs:

^ Debezium Document: https://debezium.io/documentation/reference/1.4/

^ Debezium FAQ: https://debezium.io/documentation/faq/

^ Confluent Document: https://docs.confluent.io/platform/current/overview.html

^ Aliyun DTS服务原理: https://www.alibabacloud.com/help/zh/doc-detail/176085.htm

^ Aliyun DTS应用场景: https://www.alibabacloud.com/help/zh/doc-detail/176086.htm

本作品采用 知识共享署名 4.0 国际许可协议 进行许可。

转载时请注明原文链接:https://blog.hufeifei.cn/2021/03/DB/mysql-binlog-parser/

鼓励一下
支付宝微信