基于 StarRocks 官方流批一体同步文档,适配 Flink 1.18.1 版本特性,结合 mysql-cdc-3.0.1starrocks-connector-1.2.12_flink-1.18 连接器,实现从 MySQL 到 StarRocks 的全量+增量实时同步。

一、环境准备

1.1 版本兼容性说明

组件 版本 说明
Flink 1.18.1(Scala 2.12) 需匹配 StarRocks 连接器的 Flink 版本
MySQL 5.7+/8.0+ 开启 binlog 且格式为 ROW
StarRocks 2.5+ 支持 Stream Load(8030 端口)和 JDBC 连接(9030 端口)
MySQL-CDC 连接器 flink-sql-connector-mysql-cdc-3.0.1.jar 适配 Flink 1.18+,支持增量快照
StarRocks 连接器 flink-connector-starrocks-1.2.12_flink-1.18.jar 官方适配包,支持流批一体写入

1.2 前置依赖部署

核心配置保持不变,补充生产级必要配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 1. 状态后端配置(生产必配,避免状态丢失)
state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/d/tools/flink-1.18.1/data/rocksdb # 本地存储目录
state.checkpoints.dir: file:///mnt/d/tools/flink-1.18.1/data/checkpoints # 检查点存储目录(生产建议用 HDFS/S3)
state.checkpoints.interval: 30000 # 检查点间隔 30s
execution.checkpointing.mode: EXACTLY_ONCE # 精确一次语义

# 2. 网络缓冲优化(避免 CDC 数据流阻塞)
taskmanager.memory.network.min: 128mb
taskmanager.memory.network.max: 256mb

# 3. 其他保持原有配置(如 JobManager/TaskManager 内存、端口等)
jobmanager.rpc.address: 172.21.63.113
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
rest.address: 172.21.63.113
rest.bind-address: 0.0.0.0

1.2.2 MySQL 配置(CDC 依赖)

必须开启 binlog 并配置正确格式,执行以下 SQL 检查并修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 1. 检查 binlog 状态(需为 ON)
show variables like 'log_bin';
-- 2. 检查 binlog 格式(需为 ROW)
show variables like 'binlog_format';
-- 3. 检查 binlog 行模式(需为 FULL)
show variables like 'binlog_row_image';

-- 若未满足,修改 my.cnf(或 my.ini)并重启 MySQL
[mysqld]
log_bin=mysql-bin # 开启 binlog
server-id=1 # 唯一 ID(集群环境需不同)
binlog_format=ROW # 行级 binlog
binlog_row_image=FULL # 记录完整行数据
expire_logs_days=7 # binlog 保留 7 天(避免磁盘占满)

-- 4. 授权 CDC 账号权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root'@'%';
FLUSH PRIVILEGES;

1.2.3 StarRocks 配置

  1. 确保 StarRocks FE(9030 端口)和 BE(8030 端口)正常启动,且能被 Flink 集群访问;
  2. 创建目标数据库和表(支持主键更新,适配 CDC 变更数据):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    -- 登录 StarRocks FE
    mysql -h 192.168.0.35 -P 9030 -u root -D demo

    -- 创建目标表(主键表,支持 UPSERT)
    CREATE TABLE IF NOT EXISTS demo.orders (
    product_id INT NOT NULL,
    product_name VARCHAR(100) NOT NULL,
    sales_cnt BIGINT NOT NULL
    ) ENGINE=OLAP
    PRIMARY KEY(product_id)
    DISTRIBUTED BY HASH(product_id) BUCKETS 1 -- 测试用 1 个分桶,生产根据数据量调整
    PROPERTIES (
    "enable_two_phase_commit" = "false", -- 测试先关闭两阶段提交,生产开启
    "storage_medium" = "HDD"
    );

1.3 依赖包部署

将以下 JAR 包放入 Flink 安装目录的 lib 文件夹(已确认你已添加,此处核对完整性):

  • flink-sql-connector-mysql-cdc-3.0.1.jar
  • flink-connector-starrocks-1.2.12_flink-1.18.jar
  • (可选)flink-statebackend-rocksdb-1.18.1.jar(若启用 RocksDB 状态后端)

Flink CDC 3.0.1 支持 全量快照 + 增量 binlog 一体化同步,无需额外编写全量同步逻辑,通过 scan.incremental.snapshot.enabled=true 自动实现流批一体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
-- 1. 启用 checkpoint(确保精确一次语义,需在创建表前执行)
SET 'execution.checkpointing.interval' = '30s';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'state.backend' = 'rocksdb';

-- 2. 创建 MySQL CDC 源表(全量+增量一体化)
CREATE TABLE IF NOT EXISTS orders_src (
`product_id` INT NOT NULL,
`product_name` STRING NOT NULL,
`sales_cnt` BIGINT NOT NULL,
PRIMARY KEY (product_id) NOT ENFORCED -- 声明主键,CDC 基于主键捕获变更
) WITH (
-- 基础连接配置
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.35',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'demo',
'table-name' = 'orders',
'server-time-zone' = 'Asia/Shanghai',

-- 流批一体核心配置(Flink 1.18+ 适配)
'scan.incremental.snapshot.enabled' = 'true', -- 开启增量快照(默认 true,显式声明)
'scan.snapshot.fetch.size' = '10000', -- 全量快照时每次读取行数,避免 OOM

-- 增量 binlog 优化配置
'debezium.heartbeat.interval.ms' = '5000', -- 心跳包间隔,防止连接断开(解决之前 Connection reset 问题)
'debezium.database.server.id' = '5400', -- 唯一 server-id,避免与其他 CDC 任务冲突
'debezium.database.history' = 'org.apache.flink.kafka.shaded.org.apache.kafka.connect.storage.FileOffsetBackingStore',
'debezium.database.history.file.filename' = '/tmp/flink-cdc-mysql-history.dat', -- 存储 binlog 位置历史

-- 快照锁配置(测试用 none,生产用 minimal)
'debezium.snapshot.locking.mode' = 'minimal'
);

-- 3. 创建 StarRocks Sink 表(适配 1.2.12 连接器约束)
CREATE TABLE IF NOT EXISTS orders_sink (
`product_id` INT NOT NULL,
`product_name` STRING NOT NULL,
`sales_cnt` BIGINT NOT NULL,
PRIMARY KEY (product_id) NOT ENFORCED -- 与源表主键一致,支持 UPSERT
) WITH (
-- 基础连接配置
'connector' = 'starrocks',
'load-url' = '192.168.0.35:8030', -- StarRocks BE 的 http_port(默认 8030)
'jdbc-url' = 'jdbc:mysql://192.168.0.35:9030', -- StarRocks FE 的 mysql_port
'database-name' = 'demo',
'table-name' = 'orders',
'username' = 'root',
'password' = '',

-- 写入优化配置(适配连接器约束)
'sink.semantic' = 'at-least-once', -- 测试用至少一次,生产开启两阶段提交后改为 exactly-once
'sink.max-retries' = '3', -- 写入失败重试次数
'sink.buffer-flush.interval-ms' = '1000', -- 1秒强制刷写(测试用,生产可改为 5000ms)
'sink.buffer-flush.max-rows' = '64000', -- 连接器最小限制,不可小于 64000

-- 数据格式配置
'sink.properties.format' = 'csv',
'sink.properties.column_separator' = '\x01',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.strip_outer_array' = 'true'
);

-- 4. 启动同步任务(全量快照 + 增量 binlog 实时同步)
INSERT INTO orders_sink
SELECT * FROM orders_src;

2.2 关键配置说明(解决之前的问题)

配置项 作用 之前的问题解决
scan.incremental.snapshot.enabled=true 开启增量快照,实现全量+增量一体化 避免单独处理全量同步,简化方案
debezium.heartbeat.interval.ms=5000 定期发送心跳包,保持 MySQL 连接活跃 解决之前出现的 Connection reset 错误
debezium.database.server.id=5400 唯一标识 CDC 客户端,避免 binlog 冲突 防止多任务抢占 binlog 连接
sink.buffer-flush.interval-ms=1000 1秒强制刷写,测试场景即时看到结果 解决少量数据积压不写入的问题
state.backend=rocksdb 持久化状态,支持大状态和故障恢复 避免作业重启后丢失 CDC 偏移量

三、任务提交与验证

3.1 提交任务

  1. 启动 Flink 集群:
    1
    2
    # 启动 JobManager 和 TaskManager
    /mnt/d/tools/flink-1.18.1/bin/start-cluster.sh
  2. 进入 Flink SQL Client 执行脚本:
    1
    /mnt/d/tools/flink-1.18.1/bin/sql-client.sh embedded -f sync_mysql_to_starrocks.sql

3.2 验证同步效果

3.2.1 全量同步验证

查看 StarRocks 目标表是否已同步 MySQL 中的历史数据:

1
2
3
-- 登录 StarRocks
mysql -h 192.168.0.35 -P 9030 -u root -D demo
SELECT COUNT(*) FROM orders; -- 与 MySQL 原表历史数据量一致

3.2.2 增量同步验证

在 MySQL 中插入/更新/删除数据,观察 StarRocks 表是否实时同步:

1
2
3
4
5
6
7
-- MySQL 中执行 DML 操作
INSERT INTO demo.orders (product_id, product_name, sales_cnt) VALUES (1004, 'Flink 1.18 实战', 50);
UPDATE demo.orders SET sales_cnt = 60 WHERE product_id = 1004;
DELETE FROM demo.orders WHERE product_id = 1004;

-- StarRocks 中查询结果(1秒内生效)
SELECT * FROM demo.orders WHERE product_id = 1004;

3.2.3 任务状态监控

通过 Flink WebUI(http://172.21.63.113:8081)查看:

  • 作业状态:RUNNING(无失败)
  • 数据指标:numRecordsIn(源表输入数)和 numRecordsOut(Sink 输出数)同步增长
  • 检查点:定期完成 checkpoint(无失败)

四、生产环境优化(可选)

4.1 开启 StarRocks 两阶段提交(精确一次语义)

1
2
3
4
5
6
-- 1. 修改 StarRocks 表属性
ALTER TABLE demo.orders SET ("enable_two_phase_commit" = "true");

-- 2. 修改 Flink Sink 配置(替换原有 sink.semantic)
'sink.semantic' = 'exactly-once',
'sink.two-phase-commit.enable' = 'true'

4.2 并行度优化

根据数据量调整并行度,提升同步速度:

1
2
3
4
5
6
-- 提交任务时指定并行度(例如 4 个并行)
INSERT INTO orders_sink
SELECT * FROM orders_src PARALLEL 4;

-- 或全局设置并行度
SET 'parallelism.default' = '4';

4.3 大表全量同步优化

若 MySQL 表数据量较大(千万级+),调整 CDC 快照参数:

1
2
3
4
5
6
7
-- 增大快照读取批次
'debezium.snapshot.fetch.size' = '50000',
-- 分表快照(若表分库分表)
'database-name' = 'demo',
'table-name' = 'orders_[0-9]', -- 正则匹配分表
-- 忽略历史数据,只同步增量(可选)
'scan.startup.mode' = 'latest-offset'

4.4 异常处理

  • 连接断开:通过 debezium.heartbeat.interval.msdebezium.connection.timeout.ms 保持连接;
  • 数据延迟:增大 TaskManager 内存和网络缓冲,调整并行度;
  • 状态过大:启用 RocksDB 增量检查点:SET 'state.backend.incremental' = 'true';

五、常见问题排查

问题现象 排查方向 解决方案
作业失败,报错 Connection reset MySQL 连接超时,无心跳包 添加 debezium.heartbeat.interval.ms=5000
源表有数据但 Sink 无输出 连接器参数错误,或缓冲未触发 确认 sink.buffer-flush.interval-ms=1000
StarRocks 表无数据写入 BE 8030 端口不可达,或权限不足 检查防火墙,确认 root 账号无密码登录正常
全量同步缓慢 并行度太低,或快照批次太小 增大并行度和 debezium.snapshot.fetch.size

六、总结

本方案基于 Flink 1.18.1 + MySQL-CDC 3.0.1 + StarRocks 连接器 1.2.12,实现了:

  1. 流批一体:自动完成全量快照 + 增量 binlog 同步,无需额外脚本;
  2. 适配性:严格匹配 Flink 1.18.1 版本特性和连接器约束;
  3. 稳定性:解决了之前的连接断开、数据积压等问题;
  4. 可扩展性:支持生产环境优化(精确一次语义、并行度调整、大表适配)。

按照上述步骤配置后,即可实现 MySQL 到 StarRocks 的实时、可靠同步。