見出し画像

【劔"Tsurugi" Tips集】プログラミングTips⼤量データ登録処理は可能な限り並列化して⾏う【プログラミングTips】

次世代高速RDB 劔"Tsurugi"は、メニーコア・⼤容量メモリという現代的なハードウェアを想定し、これまでのデータベースとは⼤きく異なるアーキテクチャで作られています。

そのため、これまでのデータベースと同じ感覚で使っていると「落とし⽳」にはまることがあります。

また、劍"Tsurugi"はまだまだ開発途上のプロダクトであるため、本エントリ執筆時点のバージョンである1.1.0の時点ではワークアラウンドが必要なところもあります。

そこで【劔"Tsurugi" Tips集】ではTsurugiを使いこなす上で、これらの落とし⽳を回避するためのプログラミング上、運⽤上のTipsを集めてみました。

下記、掲題の件です。


Tsurugiはメニーコア環境に最適化したデータベースです。
バッチ処理などで⼤量のデータ登録を⾏う場合、実⾏環境のコア数に応じてデータを分割し、並列に登録トランザクションを実⾏するとより⾼速に実⾏することができます。

以下は、 SomeData というJavaオブジェクトのリストが⼤量に存在するとして、そのデータを並列に登録する例を⽰します。

// 登録タスクの分割数 (この例では4にしているが、まずは物理コア数を基準にするのが良い)
int nThreads = 4;

// スレッドプールの作成
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

// ここで受け取るSomeDataのリストサイズが膨⼤であるとする
List<SomeData> dataList = receiveData();

// 分割タスク当たりの処理件数を取得
int sizePerThread = dataList.size() / nThreads;

// 並列実⾏タスクのFutureのリスト
List<Future<Integer>> futureList = new ArrayList<>();

// データを分割して並列タスクに投⼊ (タスク InsertTask については後述)
for (int i = 0; i < nThreads; i++) {
  if (sizePerThread == 0) {
   // データ数が少ない場合は1つのタスクで全データを登録
   futureList.add(executorService.submit(new InsertTask(dataList)));
   break;
  }
  int startPos = i * sizePerThread;
  // 最後のスレッドはリストの末尾まで処理
  int endPos = (i + 1 == nThreads) ? dataList.size() : startPos + sizePerThread;
  List<SomeData> subDataList = dataList.subList(startPos, endPos);
  futureList.add(executorService.submit(new InsertTask(subDataList)));
}

// 並列タスクの終了を待つ
int totalCnt = 0;
for (Future<Integer> future : futureList) {
  try {
     totalCnt = totalCnt + future.get();
  } catch (ExecutionException e) {
    // 全てのスレッドの処理結果を受け取るため、タスクから送出された例外はここでキャッチ
(ここでロギングすると良い)
  }
}

スレッドに投⼊するタスク InsertTask の実装は次のようになります。

public class InsertTask implements Callable<Integer> {
  private final List<SomeData> dataList;

  public InsertTask(List<SameData> dataList) {
    this.dataList = dataList;
  }

  @Override
  public Integer call() throws Exception {
    TsurugiConnector connector = TsurugiConnector.of("ipc:tsurugi");

    // SQLとパラメーターマッピングの例
    var sql = "insert or replace into some_data (id, data) values (:id,:data)";
    TgParameterMapping<SomeData> paramMapping =
TgParameterMapping.of(SomeData.class)
       .addString("id", SomeData::getID).addString("data",
SomeData::getData);

    // 登録トランザクション実⾏
    // タスクに渡された全データを1トランザクションで登録しているが、必要に応じて
トランザクションを分割しても良い
    try (TsurugiSession session = connector.createSession();
       TsurugiSqlPreparedStatement<SomeData> stmt =
session.createStatement(sql, paramMapping)) {
      TsurugiTransactionManager tm =
session.createTransactionManager(TgTxOption.ofOCC());
      return tm.execute(tx -> {
        int cnt = 0;
        for (SomeData data : dataList) {
          int nInserted = tx.executeAndGetCount(stmt, data);
          cnt = cnt + nInserted;
        }
        return cnt;
      });
     }
   }
}

こちらの記事は、
Tsurugi Advent Calendar 2024(https://adventar.org/calendars/10853
に参加しています。

劔"Tsurugi"関連のエンジニアが12月25日までの期間、さまざまな記事を掲載していきます。

是非、こちらもご覧ください!


次世代高速RDB劔"Tsurugi"は、オープンソースで公開中です。
ぜひダウンロードしてみて、触ってみてください。

ダウンロードはコチラから
https://www.tsurugidb.com/  

いいなと思ったら応援しよう!