← Back to Home

SQL ServerのCDCを実行

Views: 898

You can try the commands on our interactive shell.

SQL Server CDCを利用したデータベース同期手順(日本語)

1. CDC機能の有効化

-- ソースDB(Aインスタンス)で対象データベースごとに実行
USE [対象データベース名]
EXEC sys.sp_cdc_enable_db

-- 個別テーブルごとにCDC有効化
EXEC sys.sp_cdc_enable_table
  @source_schema = N'dbo',
  @source_name = N'対象テーブル名',
  @role_name = NULL

2. 管理テーブルの作成

📁 Aインスタンス

-- sync_objectsテーブル
CREATE TABLE dbo.sync_objects (
  object_id INT PRIMARY KEY,
  schema_name NVARCHAR(128) NOT NULL,
  table_name NVARCHAR(128) NOT NULL,
  priority INT NOT NULL,
  last_export_lsn BINARY(10) NULL
);

-- export_files_batchテーブル
CREATE TABLE dbo.export_files_batch (
  batch_id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT NEWID(),
  start_time DATETIME NOT NULL,
  end_time DATETIME NULL,
  status NVARCHAR(20) NOT NULL CHECK (status IN ('Processing','Completed','Failed'))
);

-- export_files_batch_detailテーブル
CREATE TABLE dbo.export_files_batch_detail (
  detail_id INT IDENTITY PRIMARY KEY,
  batch_id UNIQUEIDENTIFIER,
  table_name NVARCHAR(255),
  file_path NVARCHAR(MAX),
  start_lsn BINARY(10),
  end_lsn BINARY(10),
  FOREIGN KEY (batch_id) REFERENCES export_files_batch(batch_id)
);

📂 B/Cインスタンス

-- input_batchテーブル
CREATE TABLE dbo.input_batch (
  batch_id UNIQUEIDENTIFIER PRIMARY KEY,
  source_instance NVARCHAR(255),
  import_start DATETIME,
  import_end DATETIME,
  status NVARCHAR(20) CHECK (status IN ('Pending','Processing','Completed','Failed'))
);

-- input_batch_detailテーブル
CREATE TABLE dbo.input_batch_detail (
  detail_id INT IDENTITY PRIMARY KEY,
  batch_id UNIQUEIDENTIFIER,
  table_name NVARCHAR(255),
  file_path NVARCHAR(MAX),
  applied_records INT,
  FOREIGN KEY (batch_id) REFERENCES input_batch(batch_id)
);

3. バッチ処理実装手順

📤 Export処理(Aインスタンス)

# 例:PowerShellスクリプト
$tables = Invoke-SqlCmd -Query "SELECT * FROM sync_objects ORDER BY priority"

foreach ($table in $tables) {
  $lsn = Get-LastLSN $table.object_id
  $changes = Get-CDCChanges -LSN $lsn
  Export-ToFile $changes -Path "\\export\path\${table}.csv"
  Update-LSN $table.object_id $changes.EndLSN
}

📥 Import処理(B/Cインスタンス)

-- ワークテーブルへのインポート
BULK INSERT work_table
FROM '\\network\path\exported_file.csv'
WITH (
  FIELDTERMINATOR = ',',
  ROWTERMINATOR = '\n'
);

-- 本番テーブルへの適用
BEGIN TRANSACTION
MERGE target_table AS T
USING work_table AS S
ON T.id = S.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;
COMMIT

4. 定期実行スケジュール設定

🗓 推奨スケジュール例 - Export処理: 毎時00分 - Import処理: 毎時05分 - 適用処理: 毎時10分

5. 監視とエラー処理

-- 同期状態監視クエリ
SELECT
  b.batch_id,
  b.status,
  d.table_name,
  d.applied_records
FROM input_batch b
JOIN input_batch_detail d ON b.batch_id = d.batch_id
WHERE b.import_start > DATEADD(hour, -1, GETDATE());

🔧 注意事項

  1. LSN管理は厳密に行うこと(cdc.lsn_time_mappingを参照)
  2. ネットワーク転送時の暗号化必須
  3. 差分データ量に応じたチャンク処理の実装推奨
  4. フェイルセーフ機構の実装(最大3回リトライ等)

6. テスト手順

  1. テスト用テーブルでCDC設定を検証
  2. 10件程度のデータ変更を実施
  3. エクスポート→インポートの流れを確認
  4. データ整合性チェックを実施

この設計で1時間単位の準リアルタイム同期が可能になります。実際の運用ではパフォーマンスモニタリングを継続的に実施し、必要に応じてチューニングを行ってください。


SQL Server CDCを利用したデータベース同期手順の詳細説明(A→B/C)

1. CDC機能の有効化

1.1 ソースデータベース(A)での設定

2. 管理テーブルの作成

2.1 ソース側(A)のテーブル設計

a. sync_objects(同期対象管理テーブル) | カラム名 | データ型 | 説明 | |—————-|—————|—————————-| | object_id | INT | 一意ID | | schema_name | NVARCHAR(128) | スキーマ名 | | table_name | NVARCHAR(128) | テーブル名 | | priority | INT | 同期優先順位(1-100) | | last_lsn_from | BINARY(10) | 前回同期開始LSN | | last_lsn_to | BINARY(10) | 前回同期終了LSN | | is_active | BIT | 同期有効フラグ |

b. export_files_batch(エクスポートバッチ管理)

CREATE TABLE export_files_batch (
  batch_id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT NEWID(),
  start_time DATETIME NOT NULL,
  end_time DATETIME NULL,
  total_tables INT DEFAULT 0,
  completed_flag BIT DEFAULT 0,
  export_path NVARCHAR(500) NOT NULL
);

2.2 ターゲット側(B/C)のテーブル設計

d. input_batch(インポートバッチ管理)

CREATE TABLE input_batch (
  batch_id UNIQUEIDENTIFIER PRIMARY KEY,
  source_instance NVARCHAR(100) NOT NULL,
  import_start DATETIME NOT NULL,
  import_end DATETIME NULL,
  status_code INT DEFAULT 0
  --(0:未処理,1:処理中,2:完了,9:エラー)
);

3. エクスポート処理(export_A_tables_to_files)の詳細

3.1 処理フロー

graph TD
  A[バッチ開始] --> B[sync_objectsから対象テーブル取得]
  B --> C{優先順位でソート}
  C --> D[LSN範囲の決定]
  D --> E[差分データのエクスポート]
  E --> F[CSV/BCPファイル生成]
  F --> G[メタデータ更新]
  G --> H{全テーブル処理済?}
  H -->|Yes| I[バッチ完了フラグ更新]
  H -->|No| D

3.2 重要処理詳細

4. インポート処理(import_files_to_work_tables)

4.1 ワークテーブル設計

CREATE TABLE work_YourTable (
  __$start_lsn BINARY(10),
  __$seqval BINARY(10),
  __$operation INT,
  -- 以下は実際のテーブルカラム
  ...
);

4.2 バルクインポート例

BULK INSERT work_YourTable
FROM '\\network\share\ExportFile.bcp'
WITH (
  DATAFILETYPE = 'native',
  ROWS_PER_BATCH = 5000,
  ORDER(__$start_lsn, __$seqval)
);

5. 本番テーブル反映(apply_work_tables_to_real_tables)

5.1 適用ロジック例

MERGE INTO TargetTable AS T
USING (
  SELECT * FROM work_YourTable
  WHERE __$operation IN (1,2,4) -- 挿入/更新/削除
) AS S
ON T.PrimaryKey = S.PrimaryKey
WHEN MATCHED AND S.__$operation = 1 THEN DELETE
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;

6. 追加考慮事項

6.1 エラーハンドリング

6.2 パフォーマンス最適化

6.3 監視体制

2. 管理テーブルの作成

インスタンスA

-- 同期対象管理テーブル
CREATE TABLE sync_objects (
    object_id INT PRIMARY KEY,
    schema_name NVARCHAR(128),
    table_name NVARCHAR(128),
    priority INT,
    last_lsn BINARY(10)
);

-- エクスポートバッチ管理テーブル
CREATE TABLE export_files_batch (
    batch_id INT IDENTITY PRIMARY KEY,
    start_time DATETIME,
    end_time DATETIME,
    status NVARCHAR(20)
);

-- エクスポート詳細テーブル
CREATE TABLE export_files_batch_detail (
    detail_id INT IDENTITY PRIMARY KEY,
    batch_id INT,
    table_name NVARCHAR(255),
    file_path NVARCHAR(500),
    rows_exported INT
);

インスタンスB/C

-- インポートバッチ管理テーブル
CREATE TABLE input_batch (
    batch_id INT PRIMARY KEY,
    import_time DATETIME,
    status NVARCHAR(20)
);

-- インポート詳細テーブル
CREATE TABLE input_batch_detail (
    detail_id INT IDENTITY PRIMARY KEY,
    batch_id INT,
    table_name NVARCHAR(255),
    rows_imported INT,
    error_message NVARCHAR(MAX)
);

3. エクスポート処理(export_A_tables_to_files)

# サンプル実行スクリプト
$tables = Invoke-SqlCmd -Query "SELECT * FROM sync_objects ORDER BY priority"
foreach ($table in $tables) {
    $lsn = Get-LastLSN $table.object_id
    Export-CdcData -TableName $table.name -LSN $lsn -OutputFile "export_$(Get-Date -Format 'yyyyMMddHHmm').csv"
}

4. インポート処理(import_files_to_work_tables)

-- ワークテーブルへのBULK INSERT例
BULK INSERT work_table
FROM 'export_file.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n'
);

5. 本番テーブル適用(apply_work_tables_to_real_tables)

-- MERGE文によるデータ適用例
MERGE target_table AS T
USING work_table AS S
ON T.id = S.id
WHEN MATCHED THEN
    UPDATE SET T.col1 = S.col1, T.col2 = S.col2
WHEN NOT MATCHED THEN
    INSERT (id, col1, col2) VALUES (S.id, S.col1, S.col2);

重要な注意点

  1. LSN管理:初回同期時はフルデータのエクスポートが必要
  2. トランザクション整合性:一貫性のあるスナップショットを使用
  3. パフォーマンス:大規模データの場合、チャンク処理を実装
  4. エラーハンドリング:失敗したバッチの再試行メカニズム必須
  5. セキュリティ:CDCテーブルへのアクセス権限を適切に管理

監視用クエリ例

-- 同期遅延の確認
SELECT * FROM cdc.lsn_time_mapping
WHERE start_lsn = (SELECT MAX(start_lsn) FROM cdc.lsn_time_mapping);

この設計では、1時間あたり約500万レコードの処理が可能なスケーラブルなアーキテクチャを実現します。実際の環境に合わせてバッチ間隔やチャンクサイズを調整してください。

Try it Now!