使用Debezium采集Postgresql数据

1. 背景

为了应对搜索的场景,需要将postgresql中的数据同步到es中,涉及到全量数据与增量数据同步。数据采集工具有很多,经讨论,初步确定为Debezium。

2.Debezium

Kafka Connect 是一个可靠的,可拓展的数据流转工具,可作用于kafka与各种外部系统的数据流转,比如Mysql,Postgresql,ElasticSearch等。

Kafka Connect 提供了通用的概念和API,遵循其规范开发的插件可直接运行在Kafka Connect之上,或作为Source Connector采集数据,或作为Sink Connector存储数据。

Debezium 针对不同的数据库提供了对应的Kafka Connect插件,用于Capture Data Changes,包括Mysql,Postgresql,Mongo等。

image-20210629114934960

Debezium-Connector-Postgresql插件是基于Postgresql的逻辑复制特性实现CDC的。

这个机制允许从WAL日志中提取数据变化并通过Publication发布,使用解码器(pg10默认pgoutput)解码成可读的事件,订阅者可订阅该Publication,通过复制槽(Replication slot)获取事件。Debezium-Connector-Postgresql插件就是订阅Postgresql的Publication,获取数据变化,发送到对应的kafka中。

3. 环境搭建

kafka 2.1,Postgresql 10已经事先安装好。

3.1 Postgresql中创建逻辑复制发布

  • 开启逻辑复制

    postgresql.conf调整wal_level属性为logical,该属性需要重启才能生效

    wal_level = logical
    

    查看wal_level

    show wal_level;
    
  • 创建具有复制权限的用户,并赋予相关schema和table的的权限

    -- 创建debezium_user
    CREATE ROLE debezium_user
    	REPLICATION
    	LOGIN
    	ENCRYPTED PASSWORD '123456';
      
    -- 把schema debezium的使用权限赋给debezium_user
    GRANT USAGE ON SCHEMA debezium TO debezium_user;
      
    -- 把cities和tourists表的SELECT权限赋给debezium_user
    GRANT SELECT ON debezium.cities, debezium.tourists TO debezium_user;
    

    初始阶段的快照同步时需要访问这些相关schema和table的数据,所以需要schema和table的权限。

  • 创建Publication

    -- 创建Publication
    CREATE PUBLICATION dbz_demo FOR TABLE debezium.cities, debezium.tourists;
      
    -- 查看Publication
    SELECT A.*, B.schemaname, B.tablename FROM pg_publication A 
    INNER JOIN pg_publication_tables B ON A.pubname = B.pubname;
    

    image-20210629211747223

3.2 安装debezium插件,启动kafka connect

  • 下载debezium-connector-postgresql插件,解压到指定路径

    > wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.5.3.Final/debezium-connector-postgres-1.5.3.Final-plugin.tar.gz
      
    > tar -zxvf debezium-connector-postgres-1.5.3.Final-plugin.tar.gz /data/kafka_connect/plugins
    
  • 修改kafka connect的配置文件connect-distributed.properties

    # kafka connect可横向拓展,同一个集群使用一个group id,不能与consumer的group ids重复
    group.id=connect-cluster
    # kafka server
    bootstrap.servers=127.0.0.1:9092
    # 定义插件路径
    plugin.path=/data/kafka_connect/plugins
    
  • 启动kafka connect

    > cd /data/kafka_dev/bin
    > bin/connect-distributed.sh config/connect-distributed.properties
    

    可在/data/kafka_dev/logs/connect.log 查看相关日志

3.3 创建connector

curl -XPOST "http://127.0.0.1:8083/connectors/" -H 'Content-Type: application/json' -d '
{
  "name": "pg_232_demo_connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "127.0.0.1", 
    "database.port": "5432", 
    "database.user": "debezium_user", 
    "database.password": "123456", 
    "database.dbname" : "dev", 
    "database.server.name": "pg_232_demo",
    "slot.name": "dbz_demo_slot",
    "table.include.list": "debezium.cities, debezium.tourists",  
    "publication.name": "dbz_demo",
    "publication.autocreate.mode": "disabled",
    "plugin.name": "pgoutput",
    "snapshot.mode": "exported"
  }
}
'
属性含义
nameconnector 名称
connector.classconnector 实现类
database.dbname需要捕获数据变更的数据库
database.server.namepg数据库服务的名称,这个名称下可以包含1个或者多个postgresql 实例,由于一个connector下的数据变更是发送到以server.name为前缀的topic中,所以对于集群下不同实例的同一张表的数据变化,可以发送到同一个topic

因此不同的connector应当对应不同的server.name
slot.name复制槽的名称
table.include.list需要捕获数据变更的表
publication.name订阅的Publication名称
publication.autocreate.modePublication创建模式,由于我们是自行创建,所以这里是disabled
其他包含
all_tables - 如果pg中不存在该Publication,则自动创建一个包含所有表的Publicatin
filtered - 与all_talbes不同的是自动创建只包含table.include.list的Publication
plugin.name使用了Postgresql 10默认解码插件 pgoutput
snapshot.mode由于WAL日志不会保留全部历史,所以创建snapshot先同步数据库中已经存在的数据。
exported - 在复制槽创建时使用无锁的方式创建snapshot,也就是一个slot对应一份snapshot。debezium 1.5强烈建议使用,其他模式可能造成事件丢失。

initial - 当逻辑数据库(对应db.server.name) 没有对应的offset记录时开始创建snapshot
其他参见文档

snapshot.mode=exported模式下:

当删除connector后,postgresql的复制槽依然存在。

如果要复用复制槽并从上次的offse开始读取,connector.name和database.server.name 必须和之前一样。

如果要重新读取数据,建议新建connector,且connector.name和database.server.name使用不同的名称。如果要复用复制槽的话,可以修改connector.name和database.server.name的任意一个,都会重新从snapshot开始读取,不过没改database.server.name的话,会继续发到原来的topic中。

connector创建完成后,就会订阅dbz_232_dev,我们可以在postgresql查看到对应的复制槽的信息

-- 查看slot信息	
select * from pg_replication_slots;

image-20210629211828767

4. 测试

5.1 快照数据

创建connector之后,如果订阅的表中已经存在数据,将会对已存在的数据做快照,发送到对应的kafka topic(默认是由database.server.name,schema和table组成)。比如debezium.cities的数据就会发送到pg_232_demo.debezium.cities topic中。

运行kafka-console-consumer.sh 监听该topic,分析其数据。

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:8092 --topic pg_232_demo.debezium.cities --from-beginning
{
    "schema": {
		......
    },
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "name": "b",
            "character": "{\"view\": \"沙尘暴\", \"alias\": \"首都\"}"
        },
        "source": {
            "version": "1.5.2.Final",
            "connector": "postgresql",
            "name": "pg_232_demo",
            "ts_ms": 1624952136318,
            "snapshot": "true",
            "db": "dev",
            "sequence": "[null,\"5673398848\"]",
            "schema": "debezium",
            "table": "cities",
            "txId": 3409971,
            "lsn": 5673398848,
            "xmin": null
        },
        "op": "r",
        "ts_ms": 1624952136318,
        "transaction": null
    }
}
字段描述
schema描述了payload中的组织和结构
payloadjson格式的事件内容
payload.before事件发生前的行记录
payload.after事件发生后的行记录
payload.source记录事件来源的相关信息
payload.source.versiondebezium版本
payload.source.connectorconnector类型
payload.source.nameconnector的逻辑名称,同配置中的db.server.name
payload.source.ts_ms数据库中该事件发生的时刻
payload.source.snapshot是否是snapshot的一部分
payload.source.db数据库名称
payload.source.sequence第一个值是最后提交的lsn,第二个是当前的lsn
payload.source.schemaschema名称
payload.source.tabletable名称
payload.source.txId事务id
payload.source.lsnLog Sequence Numbers 代表wal的位置
payload.source.xmin仍然活动的最早的事务 ID
payload.op操作类型 c=create, u=update, d=delete, r=read
read 特指snapshot同步阶段的操作
payload.ts_msconnector处理该事件的时刻,结合payload.source.ts_ms
可以知道延迟大小
payload.transaction事务元数据信息,需要额外开启

4.2 变更数据

  • create事件

    插入新记录后推送的数据如下,与快照的数据基本相同,这里的snapshot为false,op为c(create)。

    {
        "schema": {
          ......
        },
        "payload": {
            "before": null,
            "after": {
                "id": 10,
                "name": "guangzhou",
                "character": "{\"alias\": \"the City of Rams\"}"
            },
            "source": {
                "version": "1.5.2.Final",
                "connector": "postgresql",
                "name": "pg_232_demo",
                "ts_ms": 1624958765094,
                "snapshot": "false",
                "db": "dev",
                "sequence": "[\"5673645496\",\"5673645496\"]",
                "schema": "debezium",
                "table": "cities",
                "txId": 3411599,
                "lsn": 5674933152,
                "xmin": null
            },
            "op": "c",
            "ts_ms": 1624958765228,
            "transaction": null
        }
    }
    
  • update事件

    更新已有记录推送的数据如下,before中记录了之前的内容,after记录了之后的内容,op为u(update)。

    {
        "schema": {
          ......
        },
        "payload": {
            "before": {
                "id": 10,
                "name": "guangzhou",
                "character": "{\"alias\": \"the City of Rams\"}"
            },
            "after": {
                "id": 10,
                "name": "guangzhou",
                "character": "{\"alias\": \"the City of Flowers\"}"
            },
            "source": {
                "version": "1.5.2.Final",
                "connector": "postgresql",
                "name": "pg_232_demo",
                "ts_ms": 1624959006739,
                "snapshot": "false",
                "db": "dev",
                "sequence": "[\"5674934168\",\"5674934168\"]",
                "schema": "debezium",
                "table": "cities",
                "txId": 3411658,
                "lsn": 5674995568,
                "xmin": null
            },
            "op": "u",
            "ts_ms": 1624959006982,
            "transaction": null
        }
    }
    

    默认情况下,表的复制标识(Replica identity)为default,即订阅者根据行记录的主键来定位到对应行的记录,这种情况下,before字段中不会有行记录的全部内容。

    如果需要返回行记录的全部内容,可修改表的复制标识为full

    -- 修改Replica identity
    ALTER TABLE debetium.cities REPLICA IDENTITY FULL;
    -- 查看表的replica_identity
    SELECT CASE relreplident
              WHEN 'd' THEN 'default'
              WHEN 'n' THEN 'nothing'
              WHEN 'f' THEN 'full'
              WHEN 'i' THEN 'index'
            END AS replica_identity
     FROM pg_class
     WHERE oid = 'debezium.tourists'::regclass;
    
  • delete事件

    删除记录推送的数据如下,before是删除前的数据,after为null,op为d(delete);

    {
        "schema": {
          ......
        },
        "payload": {
            "before": {
                "id": 10,
                "name": "guangzhou",
                "character": "{\"alias\": \"the City of Flowers\"}"
            },
            "after": null,
            "source": {
                "version": "1.5.2.Final",
                "connector": "postgresql",
                "name": "pg_232_demo",
                "ts_ms": 1624959858803,
                "snapshot": "false",
                "db": "dev",
                "sequence": "[\"5674995800\",\"5674995800\"]",
                "schema": "debezium",
                "table": "cities",
                "txId": 3411864,
                "lsn": 5675157120,
                "xmin": null
            },
            "op": "d",
            "ts_ms": 1624959859169,
            "transaction": null
        }
    }
    

5. 可视化管理

5.1 debezium-ui

  • docker方式安装debezium-ui

    docker run -it --rm --name debetium-ui -e KAFKA_CONNECT_URI=http://127.0.0.1:8083 -e UI_BASE_URI=http://127.0.0.1:8080/api -e JAVA_OPTIONS="-Xmx256m"  -p 8080:8080 debezium/debezium-ui:1.5
    
  • 使用debezium-ui http://localhost:8080

    • 查看connector

    image-20210630095906946

    • 创建connector

    image-20210630095757040

wufc
wufc
Programmer & Architect