Flink 1.18.1 实时同步 MySQL 数据到 StarRocks 流批一体方案
基于 StarRocks 官方流批一体同步文档,适配 Flink 1.18.1 版本特性,结合 mysql-cdc-3.0.1 和 starrocks-connector-1.2.12_flink-1.18 连接器,实现从 MySQL 到 StarRocks 的全量+增量实时同步。
一、环境准备
1.1 版本兼容性说明
| 组件 | 版本 | 说明 |
|---|---|---|
| Flink | 1.18.1(Scala 2.12) | 需匹配 StarRocks 连接器的 Flink 版本 |
| MySQL | 5.7+/8.0+ | 开启 binlog 且格式为 ROW |
| StarRocks | 2.5+ | 支持 Stream Load(8030 端口)和 JDBC 连接(9030 端口) |
| MySQL-CDC 连接器 | flink-sql-connector-mysql-cdc-3.0.1.jar | 适配 Flink 1.18+,支持增量快照 |
| StarRocks 连接器 | flink-connector-starrocks-1.2.12_flink-1.18.jar | 官方适配包,支持流批一体写入 |
1.2 前置依赖部署
1.2.1 Flink 集群配置(基于你的 flink-conf.yaml 优化)
核心配置保持不变,补充生产级必要配置:
1 | # 1. 状态后端配置(生产必配,避免状态丢失) |
1.2.2 MySQL 配置(CDC 依赖)
必须开启 binlog 并配置正确格式,执行以下 SQL 检查并修改:
1 | -- 1. 检查 binlog 状态(需为 ON) |
1.2.3 StarRocks 配置
- 确保 StarRocks FE(9030 端口)和 BE(8030 端口)正常启动,且能被 Flink 集群访问;
- 创建目标数据库和表(支持主键更新,适配 CDC 变更数据):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15-- 登录 StarRocks FE
mysql -h 192.168.0.35 -P 9030 -u root -D demo
-- 创建目标表(主键表,支持 UPSERT)
CREATE TABLE IF NOT EXISTS demo.orders (
product_id INT NOT NULL,
product_name VARCHAR(100) NOT NULL,
sales_cnt BIGINT NOT NULL
) ENGINE=OLAP
PRIMARY KEY(product_id)
DISTRIBUTED BY HASH(product_id) BUCKETS 1 -- 测试用 1 个分桶,生产根据数据量调整
PROPERTIES (
"enable_two_phase_commit" = "false", -- 测试先关闭两阶段提交,生产开启
"storage_medium" = "HDD"
);
1.3 依赖包部署
将以下 JAR 包放入 Flink 安装目录的 lib 文件夹(已确认你已添加,此处核对完整性):
- flink-sql-connector-mysql-cdc-3.0.1.jar
- flink-connector-starrocks-1.2.12_flink-1.18.jar
- (可选)flink-statebackend-rocksdb-1.18.1.jar(若启用 RocksDB 状态后端)
二、Flink SQL 同步方案(流批一体)
Flink CDC 3.0.1 支持 全量快照 + 增量 binlog 一体化同步,无需额外编写全量同步逻辑,通过 scan.incremental.snapshot.enabled=true 自动实现流批一体。
2.1 核心 SQL 脚本(适配 Flink 1.18.1)
1 | -- 1. 启用 checkpoint(确保精确一次语义,需在创建表前执行) |
2.2 关键配置说明(解决之前的问题)
| 配置项 | 作用 | 之前的问题解决 |
|---|---|---|
scan.incremental.snapshot.enabled=true |
开启增量快照,实现全量+增量一体化 | 避免单独处理全量同步,简化方案 |
debezium.heartbeat.interval.ms=5000 |
定期发送心跳包,保持 MySQL 连接活跃 | 解决之前出现的 Connection reset 错误 |
debezium.database.server.id=5400 |
唯一标识 CDC 客户端,避免 binlog 冲突 | 防止多任务抢占 binlog 连接 |
sink.buffer-flush.interval-ms=1000 |
1秒强制刷写,测试场景即时看到结果 | 解决少量数据积压不写入的问题 |
state.backend=rocksdb |
持久化状态,支持大状态和故障恢复 | 避免作业重启后丢失 CDC 偏移量 |
三、任务提交与验证
3.1 提交任务
- 启动 Flink 集群:
1
2# 启动 JobManager 和 TaskManager
/mnt/d/tools/flink-1.18.1/bin/start-cluster.sh - 进入 Flink SQL Client 执行脚本:
1
/mnt/d/tools/flink-1.18.1/bin/sql-client.sh embedded -f sync_mysql_to_starrocks.sql
3.2 验证同步效果
3.2.1 全量同步验证
查看 StarRocks 目标表是否已同步 MySQL 中的历史数据:
1 | -- 登录 StarRocks |
3.2.2 增量同步验证
在 MySQL 中插入/更新/删除数据,观察 StarRocks 表是否实时同步:
1 | -- MySQL 中执行 DML 操作 |
3.2.3 任务状态监控
通过 Flink WebUI(http://172.21.63.113:8081)查看:
- 作业状态:RUNNING(无失败)
- 数据指标:
numRecordsIn(源表输入数)和numRecordsOut(Sink 输出数)同步增长 - 检查点:定期完成 checkpoint(无失败)
四、生产环境优化(可选)
4.1 开启 StarRocks 两阶段提交(精确一次语义)
1 | -- 1. 修改 StarRocks 表属性 |
4.2 并行度优化
根据数据量调整并行度,提升同步速度:
1 | -- 提交任务时指定并行度(例如 4 个并行) |
4.3 大表全量同步优化
若 MySQL 表数据量较大(千万级+),调整 CDC 快照参数:
1 | -- 增大快照读取批次 |
4.4 异常处理
- 连接断开:通过
debezium.heartbeat.interval.ms和debezium.connection.timeout.ms保持连接; - 数据延迟:增大 TaskManager 内存和网络缓冲,调整并行度;
- 状态过大:启用 RocksDB 增量检查点:
SET 'state.backend.incremental' = 'true';
五、常见问题排查
| 问题现象 | 排查方向 | 解决方案 |
|---|---|---|
作业失败,报错 Connection reset |
MySQL 连接超时,无心跳包 | 添加 debezium.heartbeat.interval.ms=5000 |
| 源表有数据但 Sink 无输出 | 连接器参数错误,或缓冲未触发 | 确认 sink.buffer-flush.interval-ms=1000 |
| StarRocks 表无数据写入 | BE 8030 端口不可达,或权限不足 | 检查防火墙,确认 root 账号无密码登录正常 |
| 全量同步缓慢 | 并行度太低,或快照批次太小 | 增大并行度和 debezium.snapshot.fetch.size |
六、总结
本方案基于 Flink 1.18.1 + MySQL-CDC 3.0.1 + StarRocks 连接器 1.2.12,实现了:
- 流批一体:自动完成全量快照 + 增量 binlog 同步,无需额外脚本;
- 适配性:严格匹配 Flink 1.18.1 版本特性和连接器约束;
- 稳定性:解决了之前的连接断开、数据积压等问题;
- 可扩展性:支持生产环境优化(精确一次语义、并行度调整、大表适配)。
按照上述步骤配置后,即可实现 MySQL 到 StarRocks 的实时、可靠同步。