Embulkで新規カラムを追加してレコードごとに異なる値を入れる

こんにちは。
NOBORI入社2年目の山中です。
今回はEmbulkについてです。オリジナルデータにないカラムを追加しつつレコードごとに異なる値を入れる方法が少しややこしかったのでまとめました。

目的

オリジナルデータ(sample.csv)

id,name
1,user1
2,user2
3,user3
4,user4

上記オリジナルデータに対してindexの行を追加し、0~10のランダムな値をレコードごとに割り振り出力したい。完成イメージは以下。

id,name,index
1,user1,2
2,user2,10
3,user3,6
4,user4,0

使用したプラグインは2つ。データの変換については状況によって他のプラグインでも可能かと思います。
・embulk-filter-column(カラムの追加)
・embulk-filret-ruby_proc(データの変換)

設定ファイルの雛形の作成

seed.ymlの生成

インプットはオリジナルデータのsample.csv
アウトプットは標準出力とします。

in:
 type: file
 path_prefix: '{任意のディレクトリ}/sample.csv'
out:
 type: stdout

下記コマンドでseed.ymlから設定ファイルの雛形を自動生成

$ embulk guess seed.yml -o config.yml

生成されたconfig.yamlファイル

in:
 type: file
 path_prefix: {任意のディレクトリ}/sample.csv
 parser:
   charset: UTF-8
   newline: LF
   type: csv
   delimiter: ','
   quote: '"'
   escape: '"'
   trim_if_not_quoted: false
   skip_header_lines: 1
   allow_extra_columns: false
   allow_optional_columns: false
   columns:
   - {name: id, type: long}
   - {name: name, type: string}
out: {type: stdout}

まずは自動生成されたconfig.ymlで実行してみます。

$ embulk run config.yml
# 出力結果一部抜粋
2021-10-13 13:40:49.939 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
1,user1
2,user2
3,user3
4,user4
2021-10-13 13:40:49.973 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

するとオリジナルデータがそのまま読み込まれ出力されているのが確認できます。

カラムの追加

今回はオリジナルデータにないカラムを追加するためにembulk-filter-columnを使用します。add_columnsで全レコードに対して、指定したカラムが追加され、指定したデフォルト値が入ります。indexというカラムをデフォルト値0で追加するようにconfig.ymlを修正します。追加箇所はfiltersです。

in:
 type: file
 path_prefix: {任意のディレクトリ}/sample.csv
 parser:
   charset: UTF-8
   newline: LF
   type: csv
   delimiter: ','
   quote: '"'
   escape: '"'
   trim_if_not_quoted: false
   skip_header_lines: 1
   allow_extra_columns: false
   allow_optional_columns: false
   columns:
   - {name: id, type: long}
   - {name: name, type: string}
filters:
 - type: column
   add_columns:
   - {name: index, type: long, default: 0}
out: {type: stdout}さ

再度実行してみるとカラムが追加されているのが確認できます。

$ embulk run config.yml
# 出力結果一部抜粋
2021-10-13 13:46:48.298 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
1,user1,0
2,user2,0
3,user3,0
4,user4,0
2021-10-13 13:46:48.356 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

データの変換

indexにはランダムな値を入れたいので追加したカラムの値に対して変換を行います。今回はembulk-filret-ruby_procを使用します。indexカラムのデフォルト値0と0~10のランダムな値を合計したものをindexの値とします。

in:
 type: file
 path_prefix: {任意のディレクトリ}/sample.csv
 parser:
   charset: UTF-8
   newline: LF
   type: csv
   delimiter: ','
   quote: '"'
   escape: '"'
   trim_if_not_quoted: false
   skip_header_lines: 1
   allow_extra_columns: false
   allow_optional_columns: false
   columns:
   - {name: id, type: long}
   - {name: name, type: string}
filters:
 - type: column
   add_columns:
   - {name: index, type: long, default: 0}
 - type: ruby_proc
   columns:
     - name: index
       proc: |
         ->(index) do
           index+ rand(0..10)
         end
       type: long
out: {type: stdout}

結果を確認すると、ランダムな値が割り振られていることが確認できました。

$ embulk run config.yml
# 出力結果一部抜粋
2021-10-13 14:01:25.441 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
1,user1,10
2,user2,7
3,user3,2
4,user4,9
2021-10-13 14:01:25.569 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

まとめ

私が調査した限りでは単体のプラグインでオリジナルデータにないカラムを追加して、レコードごとに異なる値を追加するものはなさそうでした。そのため、今回はプラグインを組み合わせて処理してみました。データ変換に使用するプラグインによって処理速度も違いそうなので、そのあたりも調べてみたいです。