手把手教你使用 Wali 数据中台实现跨数据库的实时数据同步,包括配置、监控和故障处理。
赵工
9 分钟阅读
实时数据同步是数据中台的核心能力之一。本文将通过一个实际案例,演示如何使用 Wali 实现 MySQL 到 PostgreSQL 的实时数据同步。
场景介绍
业务背景
某电商公司有以下需求:
- 源系统:订单系统(MySQL 5.7)
- 目标系统:数据分析平台(PostgreSQL 13)
- 同步要求:
- 实时同步订单数据
- 延迟不超过 5 秒
- 保证数据一致性
- 支持断点续传
数据表结构
MySQL 源表:
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_no VARCHAR(32) NOT NULL UNIQUE,
customer_id BIGINT NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_customer (customer_id),
INDEX idx_created (created_at)
);
PostgreSQL 目标表:
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
order_no VARCHAR(32) NOT NULL UNIQUE,
customer_id BIGINT NOT NULL,
total_amount NUMERIC(10,2) NOT NULL,
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_customer ON orders(customer_id);
CREATE INDEX idx_created ON orders(created_at);
实施步骤
步骤 1:配置数据源
创建 config/datasources.yml:
datasources:
# MySQL 源数据库
- name: mysql-orders
type: mysql
host: 192.168.1.10
port: 3306
database: ecommerce
username: wali_user
password: ${MYSQL_PASSWORD}
options:
charset: utf8mb4
timezone: '+08:00'
# PostgreSQL 目标数据库
- name: postgres-analytics
type: postgresql
host: 192.168.1.20
port: 5432
database: analytics
username: wali_user
password: ${POSTGRES_PASSWORD}
options:
schema: public
ssl: true
步骤 2:创建同步任务
使用 Wali CLI 创建同步任务:
# 创建同步任务配置
wali sync create \
--name order-sync \
--source mysql-orders \
--target postgres-analytics \
--config sync-config.yml
sync-config.yml 内容:
sync_task:
name: order-sync
description: 订单数据实时同步
source:
datasource: mysql-orders
table: orders
tracking_column: updated_at
target:
datasource: postgres-analytics
table: orders
write_mode: upsert
conflict_resolution: source_wins
mode: realtime
settings:
batch_size: 1000
flush_interval: 5s
max_delay: 10s
parallelism: 4
transformation:
- type: add_column
column: synced_at
value: NOW()
filters:
- column: status
operator: not_in
values: ['deleted', 'cancelled']
步骤 3:启动同步任务
# 启动任务
wali sync start order-sync
# 查看任务状态
wali sync status order-sync
# 输出示例
Task: order-sync
Status: running
Mode: realtime
Records synced: 125,430
Current lag: 2.3s
Last sync: 2024-02-05 14:30:25
Errors: 0
步骤 4:监控同步状态
使用 Web 界面监控
访问 http://localhost:8080/sync/order-sync,可以看到:
- 📊 实时同步速率图表
- ⏱️ 同步延迟趋势
- ✅ 成功/失败记录统计
- 🔍 最近同步的数据样本
使用 API 监控
// 获取同步任务指标
const response = await fetch('http://localhost:8080/api/sync/order-sync/metrics');
const metrics = await response.json();
console.log(metrics);
// 输出:
{
"task_name": "order-sync",
"status": "running",
"metrics": {
"records_synced": 125430,
"records_per_second": 850,
"current_lag_ms": 2300,
"avg_lag_ms": 3100,
"error_rate": 0.001,
"last_sync_timestamp": "2024-02-05T14:30:25Z"
}
}
高级配置
数据转换
在同步过程中进行数据转换:
transformation:
# 字段映射
- type: rename
from: customer_id
to: user_id
# 数据类型转换
- type: cast
column: total_amount
target_type: numeric(12,2)
# 计算字段
- type: compute
column: order_year
expression: YEAR(created_at)
# 数据脱敏
- type: mask
column: customer_phone
method: phone
# 条件转换
- type: case
column: status_code
cases:
- when: status = 'pending'
then: 1
- when: status = 'paid'
then: 2
- when: status = 'shipped'
then: 3
- else: 0
错误处理
配置错误处理策略:
error_handling:
# 重试策略
retry:
max_attempts: 3
backoff: exponential
initial_delay: 1s
max_delay: 60s
# 失败处理
on_failure:
action: log_and_skip
dead_letter_queue: true
# 告警配置
alerts:
- condition: error_rate > 0.01
action: notify
channels: [email, webhook]
- condition: lag > 30s
action: notify
channels: [email]
性能优化
performance:
# 批量处理
batch:
size: 5000
timeout: 30s
# 并行处理
parallelism:
readers: 4
writers: 4
# 缓冲区配置
buffer:
size: 100MB
flush_interval: 5s
# 压缩传输
compression:
enabled: true
algorithm: snappy
故障处理
常见问题
问题 1:同步延迟过高
症状:
Current lag: 45.2s # 超过预期的 10s
排查步骤:
# 1. 检查源数据库负载
wali datasource check mysql-orders
# 2. 检查网络延迟
wali network test mysql-orders postgres-analytics
# 3. 查看任务详细日志
wali sync logs order-sync --tail 100
解决方案:
# 增加并行度
settings:
parallelism: 8 # 从 4 增加到 8
batch_size: 2000 # 增加批次大小
问题 2:数据不一致
症状:
Source count: 125,430
Target count: 125,428
Difference: 2 records
排查步骤:
# 对比数据
wali sync compare order-sync \
--source-query "SELECT * FROM orders WHERE id IN (1001, 1002)" \
--target-query "SELECT * FROM orders WHERE id IN (1001, 1002)"
解决方案:
# 重新同步缺失数据
wali sync repair order-sync \
--mode incremental \
--start-id 1001 \
--end-id 1002
问题 3:任务异常停止
症状:
Status: stopped
Last error: Connection timeout
解决方案:
# 检查连接
wali datasource test mysql-orders
wali datasource test postgres-analytics
# 重启任务
wali sync restart order-sync
# 启用自动重启
wali sync update order-sync \
--auto-restart true \
--restart-delay 30s
监控告警
Prometheus 指标
Wali 暴露 Prometheus 格式的指标:
# 同步速率
wali_sync_records_per_second{task="order-sync"} 850
# 同步延迟
wali_sync_lag_seconds{task="order-sync"} 2.3
# 错误率
wali_sync_error_rate{task="order-sync"} 0.001
# 活跃任务数
wali_sync_active_tasks 5
Grafana 仪表板
导入 Wali 官方 Grafana 仪表板:
# 下载仪表板配置
wget https://wali.run/grafana/sync-dashboard.json
# 导入到 Grafana
curl -X POST http://grafana:3000/api/dashboards/db \
-H "Content-Type: application/json" \
-d @sync-dashboard.json
告警规则
配置 Prometheus 告警规则:
groups:
- name: wali_sync_alerts
rules:
- alert: HighSyncLag
expr: wali_sync_lag_seconds > 30
for: 5m
labels:
severity: warning
annotations:
summary: "同步延迟过高"
description: "任务 {{ $labels.task }} 延迟 {{ $value }}s"
- alert: SyncTaskDown
expr: up{job="wali-sync"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "同步任务停止"
description: "任务 {{ $labels.task }} 已停止"
性能测试结果
测试环境
硬件配置:
CPU: 8核
内存: 16GB
磁盘: SSD
网络: 千兆
数据规模:
初始数据: 100万条
增量数据: 1000条/秒
测试结果
| 指标 | 结果 |
|---|---|
| 初始同步速度 | 15,000 条/秒 |
| 实时同步速度 | 2,000 条/秒 |
| 平均延迟 | 2.5 秒 |
| P95 延迟 | 4.8 秒 |
| P99 延迟 | 8.2 秒 |
| CPU 使用率 | 45% |
| 内存使用 | 2.5GB |
| 网络带宽 | 50 Mbps |
最佳实践
1. 选择合适的跟踪字段
-- 推荐:使用自增 ID + 更新时间
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_updated (updated_at)
);
-- 不推荐:只使用更新时间(可能遗漏数据)
2. 合理设置批次大小
# 根据数据大小调整
settings:
batch_size: 1000 # 小记录(< 1KB)
batch_size: 500 # 中等记录(1-10KB)
batch_size: 100 # 大记录(> 10KB)
3. 定期数据校验
# 每天凌晨进行全量校验
0 2 * * * wali sync verify order-sync --full
4. 保留同步日志
logging:
retention: 30d
level: info
format: json
output: /var/log/wali/sync/
总结
实时数据同步需要注意:
- 配置优化:根据数据特点调整参数
- 监控告警:及时发现和处理问题
- 错误处理:设计合理的重试和恢复机制
- 性能测试:验证系统能力和瓶颈
- 定期校验:确保数据一致性
Wali 提供了完整的实时同步解决方案,帮助企业轻松实现跨数据库的数据同步。
遇到同步问题?查看完整文档:https://docs.wali.run/sync
