实时数据同步实战:从 MySQL 到 PostgreSQL
返回博客列表

实时数据同步实战:从 MySQL 到 PostgreSQL

手把手教你使用 Wali 数据中台实现跨数据库的实时数据同步,包括配置、监控和故障处理。

赵工
9 分钟阅读

实时数据同步是数据中台的核心能力之一。本文将通过一个实际案例,演示如何使用 Wali 实现 MySQL 到 PostgreSQL 的实时数据同步。

场景介绍

业务背景

某电商公司有以下需求:

  • 源系统:订单系统(MySQL 5.7)
  • 目标系统:数据分析平台(PostgreSQL 13)
  • 同步要求
    • 实时同步订单数据
    • 延迟不超过 5 秒
    • 保证数据一致性
    • 支持断点续传

数据表结构

MySQL 源表

CREATE TABLE orders (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  order_no VARCHAR(32) NOT NULL UNIQUE,
  customer_id BIGINT NOT NULL,
  total_amount DECIMAL(10,2) NOT NULL,
  status VARCHAR(20) NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  INDEX idx_customer (customer_id),
  INDEX idx_created (created_at)
);

PostgreSQL 目标表

CREATE TABLE orders (
  id BIGSERIAL PRIMARY KEY,
  order_no VARCHAR(32) NOT NULL UNIQUE,
  customer_id BIGINT NOT NULL,
  total_amount NUMERIC(10,2) NOT NULL,
  status VARCHAR(20) NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_customer ON orders(customer_id);
CREATE INDEX idx_created ON orders(created_at);

实施步骤

步骤 1:配置数据源

创建 config/datasources.yml

datasources:
  # MySQL 源数据库
  - name: mysql-orders
    type: mysql
    host: 192.168.1.10
    port: 3306
    database: ecommerce
    username: wali_user
    password: ${MYSQL_PASSWORD}
    options:
      charset: utf8mb4
      timezone: '+08:00'
      
  # PostgreSQL 目标数据库
  - name: postgres-analytics
    type: postgresql
    host: 192.168.1.20
    port: 5432
    database: analytics
    username: wali_user
    password: ${POSTGRES_PASSWORD}
    options:
      schema: public
      ssl: true

步骤 2:创建同步任务

使用 Wali CLI 创建同步任务:

# 创建同步任务配置
wali sync create \
  --name order-sync \
  --source mysql-orders \
  --target postgres-analytics \
  --config sync-config.yml

sync-config.yml 内容:

sync_task:
  name: order-sync
  description: 订单数据实时同步
  
  source:
    datasource: mysql-orders
    table: orders
    tracking_column: updated_at
    
  target:
    datasource: postgres-analytics
    table: orders
    write_mode: upsert
    conflict_resolution: source_wins
    
  mode: realtime
  
  settings:
    batch_size: 1000
    flush_interval: 5s
    max_delay: 10s
    parallelism: 4
    
  transformation:
    - type: add_column
      column: synced_at
      value: NOW()
      
  filters:
    - column: status
      operator: not_in
      values: ['deleted', 'cancelled']

步骤 3:启动同步任务

# 启动任务
wali sync start order-sync

# 查看任务状态
wali sync status order-sync

# 输出示例
Task: order-sync
Status: running
Mode: realtime
Records synced: 125,430
Current lag: 2.3s
Last sync: 2024-02-05 14:30:25
Errors: 0

步骤 4:监控同步状态

使用 Web 界面监控

访问 http://localhost:8080/sync/order-sync,可以看到:

  • 📊 实时同步速率图表
  • ⏱️ 同步延迟趋势
  • ✅ 成功/失败记录统计
  • 🔍 最近同步的数据样本

使用 API 监控

// 获取同步任务指标
const response = await fetch('http://localhost:8080/api/sync/order-sync/metrics');
const metrics = await response.json();

console.log(metrics);
// 输出:
{
  "task_name": "order-sync",
  "status": "running",
  "metrics": {
    "records_synced": 125430,
    "records_per_second": 850,
    "current_lag_ms": 2300,
    "avg_lag_ms": 3100,
    "error_rate": 0.001,
    "last_sync_timestamp": "2024-02-05T14:30:25Z"
  }
}

高级配置

数据转换

在同步过程中进行数据转换:

transformation:
  # 字段映射
  - type: rename
    from: customer_id
    to: user_id
    
  # 数据类型转换
  - type: cast
    column: total_amount
    target_type: numeric(12,2)
    
  # 计算字段
  - type: compute
    column: order_year
    expression: YEAR(created_at)
    
  # 数据脱敏
  - type: mask
    column: customer_phone
    method: phone
    
  # 条件转换
  - type: case
    column: status_code
    cases:
      - when: status = 'pending'
        then: 1
      - when: status = 'paid'
        then: 2
      - when: status = 'shipped'
        then: 3
      - else: 0

错误处理

配置错误处理策略:

error_handling:
  # 重试策略
  retry:
    max_attempts: 3
    backoff: exponential
    initial_delay: 1s
    max_delay: 60s
    
  # 失败处理
  on_failure:
    action: log_and_skip
    dead_letter_queue: true
    
  # 告警配置
  alerts:
    - condition: error_rate > 0.01
      action: notify
      channels: [email, webhook]
      
    - condition: lag > 30s
      action: notify
      channels: [email]

性能优化

performance:
  # 批量处理
  batch:
    size: 5000
    timeout: 30s
    
  # 并行处理
  parallelism:
    readers: 4
    writers: 4
    
  # 缓冲区配置
  buffer:
    size: 100MB
    flush_interval: 5s
    
  # 压缩传输
  compression:
    enabled: true
    algorithm: snappy

故障处理

常见问题

问题 1:同步延迟过高

症状

Current lag: 45.2s  # 超过预期的 10s

排查步骤

# 1. 检查源数据库负载
wali datasource check mysql-orders

# 2. 检查网络延迟
wali network test mysql-orders postgres-analytics

# 3. 查看任务详细日志
wali sync logs order-sync --tail 100

解决方案

# 增加并行度
settings:
  parallelism: 8  # 从 4 增加到 8
  batch_size: 2000  # 增加批次大小

问题 2:数据不一致

症状

Source count: 125,430
Target count: 125,428
Difference: 2 records

排查步骤

# 对比数据
wali sync compare order-sync \
  --source-query "SELECT * FROM orders WHERE id IN (1001, 1002)" \
  --target-query "SELECT * FROM orders WHERE id IN (1001, 1002)"

解决方案

# 重新同步缺失数据
wali sync repair order-sync \
  --mode incremental \
  --start-id 1001 \
  --end-id 1002

问题 3:任务异常停止

症状

Status: stopped
Last error: Connection timeout

解决方案

# 检查连接
wali datasource test mysql-orders
wali datasource test postgres-analytics

# 重启任务
wali sync restart order-sync

# 启用自动重启
wali sync update order-sync \
  --auto-restart true \
  --restart-delay 30s

监控告警

Prometheus 指标

Wali 暴露 Prometheus 格式的指标:

# 同步速率
wali_sync_records_per_second{task="order-sync"} 850

# 同步延迟
wali_sync_lag_seconds{task="order-sync"} 2.3

# 错误率
wali_sync_error_rate{task="order-sync"} 0.001

# 活跃任务数
wali_sync_active_tasks 5

Grafana 仪表板

导入 Wali 官方 Grafana 仪表板:

# 下载仪表板配置
wget https://wali.run/grafana/sync-dashboard.json

# 导入到 Grafana
curl -X POST http://grafana:3000/api/dashboards/db \
  -H "Content-Type: application/json" \
  -d @sync-dashboard.json

告警规则

配置 Prometheus 告警规则:

groups:
  - name: wali_sync_alerts
    rules:
      - alert: HighSyncLag
        expr: wali_sync_lag_seconds > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "同步延迟过高"
          description: "任务 {{ $labels.task }} 延迟 {{ $value }}s"
          
      - alert: SyncTaskDown
        expr: up{job="wali-sync"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "同步任务停止"
          description: "任务 {{ $labels.task }} 已停止"

性能测试结果

测试环境

硬件配置:
  CPU: 8核
  内存: 16GB
  磁盘: SSD
  网络: 千兆

数据规模:
  初始数据: 100万条
  增量数据: 1000条/秒

测试结果

指标 结果
初始同步速度 15,000 条/秒
实时同步速度 2,000 条/秒
平均延迟 2.5 秒
P95 延迟 4.8 秒
P99 延迟 8.2 秒
CPU 使用率 45%
内存使用 2.5GB
网络带宽 50 Mbps

最佳实践

1. 选择合适的跟踪字段

-- 推荐:使用自增 ID + 更新时间
CREATE TABLE orders (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  INDEX idx_updated (updated_at)
);

-- 不推荐:只使用更新时间(可能遗漏数据)

2. 合理设置批次大小

# 根据数据大小调整
settings:
  batch_size: 1000   # 小记录(< 1KB)
  batch_size: 500    # 中等记录(1-10KB)
  batch_size: 100    # 大记录(> 10KB)

3. 定期数据校验

# 每天凌晨进行全量校验
0 2 * * * wali sync verify order-sync --full

4. 保留同步日志

logging:
  retention: 30d
  level: info
  format: json
  output: /var/log/wali/sync/

总结

实时数据同步需要注意:

  1. 配置优化:根据数据特点调整参数
  2. 监控告警:及时发现和处理问题
  3. 错误处理:设计合理的重试和恢复机制
  4. 性能测试:验证系统能力和瓶颈
  5. 定期校验:确保数据一致性

Wali 提供了完整的实时同步解决方案,帮助企业轻松实现跨数据库的数据同步。


遇到同步问题?查看完整文档:https://docs.wali.run/sync