実データで覚える Treasure Client コマンドラインリファンス 〜1.Data Import〜

本シリーズではTreasure Client ツールのコマンドラインリファレンスを以下の5つのレイヤーに分解し,各々について解説していくものとします。

No.レイヤーコマンド
1. Data Import

one-time import

bulk import

2. Data Management

db

table

3. Data Processing

schema

query

job

4. Data Scheduling

sched

result

5. Other

help

status

server

sample

本日は 1. Data Import を紹介します。本シリーズでは実際にデータを使用して,その分析の流れに沿いながらコマンドラインを紹介していきます。

Treasure Platform の登録および Client Tool のインストール

 それでは早速サインアップを始めましょう!こちらをクリックして下図のページより登録お願いします。

f:id:doryokujin:20140123091913p:plain

 

続いてクライアントツールをダウンロードします。

 

f:id:doryokujin:20140123092331p:plain

Source Data

カーセンサーnet Webサービス

 

リクルート様が公開してくれているカーセンサーの中古車相場データを使います。(データを提供して頂いたリクルート様,ありがとうございます!)

f:id:doryokujin:20140123092639p:plain

One-Time Import

-- one-time import --

 
$ td table:import ↵
usage:
$ td table:import <db> <table> <files...>
 
example:
$ td table:import example_db table1 --apache access.log
$ td table:import example_db table1 --json -t time - < test.json
 
options:
--format FORMAT file format (default: apache)
--apache same as --format apache; apache common log format
--syslog same as --format syslog; syslog
--msgpack same as --format msgpack; msgpack stream format
--json same as --format json; LF-separated json format
-t, --time-key COL_NAME time key name for json and msgpack format (e.g. 'created_at')

1. table:import <db> <table> <files...>

$ td table:create carsensor body_master

$ td table:create carsensor color_master

$ td table:create carsensor country_master

$ td table:create carsensor large_area_master

$ td table:create carsensor pref_master

 

$ td table:import carsensor body_master --format json --time-key time body_master.json

$ td table:import carsensor color_master --format json --time-key time color_master.json

$ td table:import carsensor country_master --format json --time-key time country_master.json

$ td table:import carsensor large_area_master --format json --time-key time large_area_master.json

$ td table:import carsensor pref_master --format json --time-key time pref_master.json

table:import コマンドは手軽にファイルを Treasure Cloud Storage へインポートできるコマンドです。状況に応じて bulk_import と使い分けてください。

  •  対応ファイルフォーマット: apache, syslog, json, msgpack, msgpack.gz

※ 最適なフォーマットは msgpack.gz です。ファイルサイズが小さいという意味で。apache, syslog 形式は特殊な使い方になります。なお,csv, tsv には対応していません。

※ csv, tsv フォーマットを手軽に msgpack.gz に変換したい場合は import:prepare コマンドを利用してください。

※ なお,オプションとして -t (--time-key) が必須です。これは任意のレコードに必ず UNIX_TIMESTAMP を持ったカラムが必要である事を意味しています。TD Storage ではこの time カラムの情報よりレコードを日付&&時間ごとにパーティションを行って管理しています。

[追記] オプションを付けると任意の型のTIMEフォーマットに対応しました。

2. まとめ

これで one-time import は手軽ですが,以下の点に注意が必要です。

  • 実行途中でに失敗した場合には途中からのリトライが不可能。再度同じコマンドを利用するとレコードの重複が生じる。
  • 最適な形式は圧縮された msgpack.gz 形式。msgpack ファイルがある場合は gzip コマンドによって事前圧縮しておく。
  • json -> msgpack.gz も変換が用意なのでできるだけ事前変換しておく。
  • msgpack ファイルの内容確認は,Rubyワンライナーで可能:

$ ruby -rmsgpack -e "File.open('input.msgpack') {|f| MessagePack::Unpacker.new(f).each {|r| p r } }"

Bulk Import

始めに Bulk Import の基本概念を紹介します。

f:id:doryokujin:20140122182616p:plain

  • Prepare: 元データをアップロードに最適なファイルサイズに圧縮します。この操作はローカルで行われ,出力形式は gzipped MessagePack format です。
  • Upload: 圧縮されたデータは Treasure Data’s (row-based) bulk upload storage system に並列アップロードされます。このアップロードはセキュアなインターネットコネクションを用いています。
  • Perform: アップロードされたデータは Treasure Data の列指向データベース(Treasure Cloud Storage)に MapReduce を用いて保存されます。
  • Commit: Perform ステップの後,データは Plazma  (Treasure Data’s columnar, distributed storage system)  互換の形式に変換されています。最後に Commit コマンドによって Plazma データベースにコピーされます。
-- bulk import -- 
$ td import                                                                                                                                        
  import:list                                               # List bulk import sessions
  import:show <name>                               # Show list of uploaded parts
  import:create <name> <db> <table>       # Create a new bulk import session to the the table
  import:java_version                                # Show version
  import:prepare <files...>                        # Convert files into part file format
  import:upload <name> <files...>            # Upload or re-upload files into a bulk import session
  import:auto <name> <files...>               # Upload files and automatically perform and commit
  import:perform <name>                         # Start to validate and convert uploaded files
  import:error_records <name>                # Show records which did not pass validations
  import:commit <name>                         # Start to commit a performed bulk import session
  import:delete <name>                           # Delete a bulk import session
  import:freeze <name>                           # Reject succeeding uploadings to a bulk import session
  import:unfreeze <name>                       # Unfreeze a frozen bulk import session

1. import:create <name> <db> <table> 

bulk_import を行うための「セッション」を作成します。$ td table:create とは異なり,「テーブル」を作成するのではありませんので混同しないようにしてください。

なお,bulk_import でインポートするための<db>,<table>は既に作成されている必要があります。テーブルに既にレコードが入っている場合は bulk_import によって append されます。

$ td db:create carsensor

# for catalog_measure table

$ td table:create carsensor catalog

$ td import:create session_catalog carsensor catalog 

# for usedcar_measure table

$ td table:create carsensor usedcar

$ td import:create session_usedcar carsensor usedcar

# for brand_master table

$ td table:create carsensor brand_master

$ td import:create session_brand_master carsensor brand_master

上段の例では catalog_measure.csv ファイルに関して,セッションを "session_catalog" 名で作成し,Import 先となる db名: "carsensor" と table名: "catalog" を指定しています。(以後は catalog_measure.csv ファイルの Bulk Import のみを進めて行きます。中段の usedcar_measure.csv,下段の brand_master ファイルも同様の処理になります。)

2. import:prepare <files...>

このコマンドは bulk_import でデータをアップロードする前の前処理です。このコマンドによってインプットファイルサイズを bulk_import に最適なフォーマット( msgpack.gz )および最適なファイルサイズ( 約17MB )に分割されます。分割された msgpack.gz ファイルは -o オプションに指定されたディレクトリに保存されていきます。

  • 対応インプットフォーマット: csv, tsv, json, msgpack, msgpack.gz

今回は csv ファイルを扱います。

[追記] まずはディレクトリ内に "parts"ディレクトリを作成しておいて下さい。prepare で分割されたファイルはここに入ります。また,元々あった "parts_result" は既に prepare を実行したファイルが入っていますので,このステップを飛ばしたい人は "parts" に名前を変更して次にお進み下さい。

# for catalog_measure table

$ td import:prepare catalog_measure.csv --format csv --column-header --time-column 'publication_date' -o ./parts/

  

Prepare status:

  Source    : catalog_measure.csv

    Status          : SUCCESS

    Read lines      : 84635

    Valid rows      : 84633

    Invalid rows    : 1

    Converted Files : ./parts/catalog_measure_csv_0.msgpack.gz (1505281 bytes

 

Next steps:

  => execute following 'td import:upload' command. if the bulk import session is not created yet, please create it with 'td import:create <session> <database> <table>' command.

     $ td import:upload <session> './parts/catalog_measure_csv_0.msgpack.gz'

 

# for usedcar_measure table => parts_result フォルダに usedcar_measure_n.msgpack.gz のアウトプットを置いてある。

 

# for catalog_measure table

$ td import:prepare brand_master.csv --format csv --column-header --time-column 'time' -o ./parts/

※ --columns には "column1, colmun2,..." とヘッダーカラム名を羅列します。もしcsvの一行目がヘッダーになっている場合は, --columns の代わりに --column-header オプションを指定してやります。

※ --time-column には UNIX TIMESTAMP となっているカラム名を指定します。TD Storage へのインポートの際,全てのレコードには unix timestamp を持ったカラムが必須となります。なぜならこの unix timestamp を日付かる各時間毎にパーティションを行っているからです。今回のケースでは擬似的に「掲載日」:publication_date を追加し,それを指定しています。

3. td import:upload <name> <files...>

次に分割圧縮したパーツファイルを Treasure Cloud Strage へ並列アップロードします。

$ td import:upload session_catalog './parts/catalog_measure_csv_0.msgpack.gz'

上記のコマンド中にコマンドが強制終了,およびネットワークが切断されて中断された場合にも,再度同じコマンドを実行し直すことでデータの重複無くアップロードを再開してくれます。これが table:import には無い大きなメリットとなっています。

なお,一般的にアップロードファイルは複数有ります。これらを並列に upload することも可能になっています。(デフォルトの並列数は「2」となっています)以下の例では 8並列でアップロードを行ってくれます。

す。

$ td import:upload --parallel 8 <name> ./parts/*

また,連続して他の分割ファイルもその後追加アップロードしていくことが可能です。

$ td import:upload <name> ./parts1/*

$ td import:upload <name> ./parts2/*

$ td import:upload <name> ./parts3/*

アップロードされたファイルを確認するには,

$ td import:show <name>

を実行します。

4. td import:perform <name> 

1. $ td import:freeze session_catalog

2. $ td import:perform session_catalog

Job 7216190 is queued.

 

$ td job:show -w 7216190

JobID       : 7216190

Status      : success

Type        : bulk_import_perform

Priority    : NORMAL

Retry limit : 0

Result      :

Database    : carsensor

Query       :

3. $  td import:list

+--------------------+-----------------------+---------------+----------+----------+

| Name               | Table                   | Status        | Frozen | JobID   |

+--------------------+-----------------------+---------------+----------+----------+

| session_catalog | carsensor.catalog | Ready         | Frozen | 7216190 |

+--------------------+-----------------------+---------------+----------+----------+

 

4. $  td import:error_records session_catalog

(No Output)

  1. まず freeze コマンドでセッションに「蓋」をします(追加アップロードできないようにします,ただし unfreeze コマンドで追加可能に戻せます)。
  2. 日付&&時間をキーにレコードを MapReduce します。MapReduce が実行されますのでレコード数に応じてそれなりに時間がかかります。-w は wait オプションで実行完了まで経過を表示しつづけるオプションです。
  3. perform 完了後の list コマンドの表示結果では,perform の実行結果が表示され,Status:Ready となっています。テーブルへのインポート完了まで後一歩です。
  4. なお,ここで ErrorRecords(perform 中に問題のあったレコード)に関する情報を確認しておく必要があります。

5. td import:commit <name> 

$ td import:commit <name>

Bulk import session 'session_catalog' started to commit.

commit コマンドによって実際のテーブルへとインポートが行われ,一連の import 処理が完了になります。なお,このコマンドで指定する<name>は「セッション」名であり,書き込まれる「テーブル」は 1. の import:create <name> <db> <table> で指定したテーブルになることに注意してください。

6. まとめ

$ td import:list

+--------------------+-----------------------+---------------+----------+----------+

| Name               | Table                   | Status        | Frozen | JobID   |

+--------------------+-----------------------+---------------+----------+----------+

| session_catalog | carsensor.catalog | Committed | Frozen | 7216190 |

+--------------------+-----------------------+---------------+----------+----------+

 

$ td table:tail carsensor catalog -P

{

  "publication_date": 1390356579,

  "time": 1390356579,

  "person": 7,

  "body_code": "M",

  "code": "TO",

  "body_name": "ミニバン",

  "width": 1650,

  "name": "トヨタ",

  "height": 1935,

  "length": 4070,

  "price": 2285000,

  "period": "198808-198908",

  "desc": "※このクルマの燃費は発売当時の資料により、10モードの数値を表示しております。燃費=9.2km/l",

  "model": "ライトエース",

  "series": "E-YM40G",

  "grade": "2.0 FXV 4WD"

}...

これで bulk_import は完了です。多少ステップ数はあるものの,以下の点で table:import コマンドより優れています:

  • upload コマンドによって自動的にインポート対象ファイルを元ファイルより小さな msgpack.gz という圧縮形式に変換してくれ,ネットワーク負担を下げてくれる。
  • アップロードが途中で失敗しても重複することなく途中からリトライできる。
  • 並列アップロードが可能。
  • 作成したセッションを残しておけば,同じファイルを別名のテーブルへ再アップロードなしに commit,つまりテーブル再インポートが可能。

ただし,以下の点に注意してください:

  •  prepare コマンドはシングルスレッドなので遅い。[修正] オプションによって並列処理が可能です。(デフォルトは並列数2です。) 複数ファイルを対象にする場合は, $ ls <files*> | parallel td import:prepare {} などで並列実行する。
  • [追加] prepare コマンドと upload コマンドを同時に事項できるコマンドが import:upload コマンドです。これによると新しい分割ファイルが作成されると同時にそのファイルをアップロードしてくれます。
  • インポート対象ファイルに前処理(不要レコード削除・カラム追加・集計カラム追加)を行う場合は自前で前処理コマンド → msgpack.gz への変換スクリプトを作成する必要がある。