Flink-cdc实时读postgresql
由于公司业务需要,需要实时同步pgsql数据,我们选择使用flink-cdc方式进行
架构图:
前提步骤:
1,更改配置文件postgresql.conf
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改
2,新建用户并且给用户复制流权限
— pg新建用户
CREATE USER user WITH PASSWORD ‘pwd’;
— 给用户复制流权限
ALTER ROLE user replication;
— 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;
— 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
3,发布表
-- 设置发布为true update pg_publication set puballtables=true where pubname is not null; -- 把所有表进行发布 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 查询哪些表已经发布 select * from pg_publication_tables;
4,更改表的复制标识包含更新和删除的值
-- 更改复制标识包含更新和删除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='test0425';
OK,到这一步,设置已经完全可以啦,上面步骤都是必须的
5,下面开始上代码:,
maven依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>1.1.0</version> </dependency>
java代码
package flinkTest.connect; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class PgsqlToMysqlTest { public static void main(String[] args) { //设置flink表环境变量 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); //获取flink流环境变量 StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); exeEnv.setParallelism(1); //表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings); //拼接souceDLL String sourceDDL = "CREATE TABLE pgsql_source (\n" + " id int,\n" + " name STRING,\n" + " py_code STRING,\n" + " seq_no int,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'postgres-cdc',\n" + " 'hostname' = '***',\n" + " 'port' = '5432',\n" + " 'username' = '***',\n" + " 'password' = '***',\n" + " 'database-name' = '***',\n" + " 'schema-name' = 'public',\n" + " 'decoding.plugin.name' = 'pgoutput',\n" + " 'debezium.slot.name' = '***',\n" + " 'table-name' = '***'\n" + ")"; String sinkDDL = "CREATE TABLE mysql_sink (\n" + " id int,\n" + " name STRING,\n" + " py_code STRING,\n" + " seq_no int,\n" + " description STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://ip:3306/DB?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8',\n" + " 'username' = '***',\n" + " 'password' = '***',\n" + " 'table-name' = '***'\n" + ")"; String transformSQL = "INSERT INTO mysql_sink " + "SELECT id,name,py_code,seq_no,description " + "FROM pgsql_source"; //执行source表ddl tableEnv.executeSql(sourceDDL); //执行sink表ddl tableEnv.executeSql(sinkDDL); //执行逻辑sql语句 TableResult tableResult = tableEnv.executeSql(transformSQL); } }
表机构奉上:
-- pgsql表结构 CREATE TABLE "public"."test" ( "id" int4 NOT NULL, "name" varchar(50) COLLATE "pg_catalog"."default" NOT NULL, "py_code" varchar(50) COLLATE "pg_catalog"."default", "seq_no" int4 NOT NULL, "description" varchar(200) COLLATE "pg_catalog"."default", CONSTRAINT "pk_zd_business_type" PRIMARY KEY ("id") ) ; -- mysql表结构 CREATE TABLE `test` ( `id` int(11) NOT NULL DEFAULT '0' COMMENT 'ID', `name` varchar(50) DEFAULT NULL COMMENT '名称', `py_code` varchar(50) DEFAULT NULL COMMENT '助记码', `seq_no` int(11) DEFAULT NULL COMMENT '排序', `description` varchar(200) DEFAULT NULL COMMENT '备注', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6,下面就可以进行操作原表,然后增删改操作