Sharding を使いこなすための5つのTips

@です。今日も相変わらずMongoDBの、そしてShardingに関する記事を書こうと思います。

…と、その前にお知らせです!6月は2つのMongoDB勉強会を予定しております、是非ご参加下さい!

・2011年6月11日(土) 「第4回 MongoDB 勉強会 in Tokyo」@フューチャーアーキテクト 
・2011年6月28日(火)「第1回 MongoDB ソースコードリーディング」@PFI

さて、それでは本題に入りたいと思います。
MongoDBのShardingといえば、

・初期設定やShardの追加・削除といった導入の容易さ
・Shardの面倒をMongo側がずっと見てくれるという管理の容易さ

を備えていると言うことで興味を持っておられる方も多数いると思います。
しかしその一方で実際にSharding環境を導入している方々の中の多くは、遭遇する様々な不都合や不整合に頭を悩まし、初期設定のミスで取り戻しのつかない状況に陥っていたりと、想定していた以上の苦労を強いられているように思われます。確かに現バージョン(v1.8)のMongoDBのShardingはまだまだ扱いが難しく、出来る限り「Shardingを使わない」方向で検討を進める方が得策である場合も多くあるような気がします。しかしきちんと対処法を身につけておきさえすれば、そういった起こりうるShardingの種々の問題に立ち向かうことができるはずです。
今日は少しマニアックな内容ですがShardingについて、知っておくと将来きっと役立つであろう5つのTipsを紹介したいと思います。
MongoDBのShardingはその余りある欠点を補うほどに強力です。大規模なSharding環境では、TBを超えるデータをあたかもシングルサーバーのMongoDBを相手にしているように簡単に扱え(若干言い過ぎですが…)、Replicationと連携した「大規模データストレージ」としての役割と、Mongo MapReduceやRedisやHadoopとの連携した「大規模データ解析基盤」としての役割を高い次元で両立させることができます。
この事はログ解析の観点で言えばScribeやFlume等で多数のサーバーに分散された大規模ログを定常的にMongoDBに集約させつつ、Mongo ShellからMap Reduceをさらっと記述して実行して、非バッチな処理やごく一部の領域にしか存在しない対象の解析でもインデックスなどを活用して高速かつ手軽に実行することができるようになる事を意味しています。Webアプリケーションの観点で言えば、あらゆるユーザーデータをMongoDBに格納し、一方バックグラウンドではリアルタイムに集計やクラスタリングアルゴリズムなどを走らせ、ユーザー単位でのパラメータの最適化やデータに基づいたアクションを即座にアプリケーションに反映することができる事を意味しています。

Sharding 5 Tips

0. 準備 〜自動Shardingと自動Balancing機能、そしてChunkという概念を知る〜
1. Primary Shardを活用する 〜Primary Shardの存在を知る〜
2. Chunkの事前分割 〜運用開始からデータを分散させる〜
3. Chunkの手動マイグレーション 〜安全に効率的にChunkを移動する〜
4. Chunkの手動分割&マイグレーション 〜動かないChunkを移動する〜
5. Shardの状態を監視する 〜有用な2つのコマンド〜

0. 準備 〜自動Shardingと自動Balancing機能、そしてChunkという概念を知ろう〜

ここではMongo Shardingの基本的な概念について簡単に解説したいと思います。「自動Sharding」とは挿入されるデータに対して、そのデータがどのShardに入るべきかを決定して振り分けてくれる機能です。より具体的に言えば、Sharding環境においてMongoDBはInsertされたドキュメントを(最初に設定した)ShardKeyの値に基づいて振り分けます。Sharding環境ではドキュメントの集合である"Chunk"という単位を持ち、このChunk単位で分割や移動が行われます。それぞれのChunkは他と重複しないShardKeyのレンジを持っています。例えばChunkAは [ "a", "k" ), ChunkBが [ "k", "{" ) のレンジを持っていたとすると、ShardKeyのイニシャルが"a"から"j"までの値を持つドキュメントはChunkAに、"k"から"z"までを持つドキュメントはChunkBに属します。"{" は "z" の次の順序を持つ値です。例えば"inoue" の値を持つドキュメントは ChunkA に、"takahiro" を持つドキュメントは ChunkB に属する事になります。繰り返しますが、このChunkがShard間を移動したり分割されたりします。1つ1つのドキュメント単位でそれらが行われることが無いことにはくれぐれも注意して下さい。

各Chunkはそのレンジが重複することは無く、またカバーしていないレンジもありません。先ほどの例では実は最低でもさらに後2つのChunkが存在します。それは [ -∞, "a" ) と [ "{", ∞ ] です。そして1つのChunkがデフォルトで200MB(Sharding開始時は64MB)以上のサイズに膨れあがると、Chunkはそれを等分割するキーによって分割されます。例えば ChunkA は [ "a", "g" ) と [ "g", "k" ) に分割されたりします。データ流入量が増え、Chunkの分割があちこちで行われるようになると、Shard間でChunkの数にばらつきが起こってきます。ドキュメントのShardKeyが特定のレンジ、つまり特定のChunkに集中する場合はそのShard内でたくさん分割が行われる事になります。これはある程度避けられない事であり、またデータローカリティを強く意識するならこの現象は必ずしも悪いものとも言い切れません。
MongoDBでは現在アクセスされているShardのChunkの数とクラスタ内で最小のChunk数を持つShardとの数の差を計算しており、その差が10以上になったときに、「Shard間に偏りが起こっている」と認識され、Chunk数が大のところから小のところへChunkの移動(マイグレーション)が行われます。これを自動Balancing機能と呼び、バックグラウンドでShard間のデータサイズを均質に保ち続けようとしてくれる親切な機能です。

このように、MongoDBのShardingは自動Sharding機能と自動Balancing(とReplica Sets の自動フェイルオーバー機能)によって容易にスケールさせる事が可能です。その他のShardingの機能についてや、ここで取り上げなかったSharding環境で起こる様々な問題については過去の資料をご覧下さい。

1. Primary Shardを活用する 〜Primary Shardの存在を知ろう〜

MongoDBには"Primary Shard"という概念があります。これは運用開始直後の数個しかないChunkが保存される初期Shardであり、MapReduceのOutputCollectionが作成される場所であり、その他様々なケースで優先的に使われるShardです。このPrimary Shardの存在を知り、適切な設定を行っておく事は以下の意味で非常に有用です。

・メモリやCPUの優秀なマシンを割り降ることで初期段階のデータの局所集中に耐える
・Primary ShardをSSD上に配置し、Map Reduceなどの出力を高速に行う

この2つの説明に入る前に、まずはShardingの構成について簡単にお話しておきます。

Sharding の初期構成

※ここでは簡単なShardingを用意する時間が無かったので、現在使用しているSharding環境を流用させて頂きます。

# 25のShard ( With Replica Sets ) を追加(24HDD + 1SSD)
# HDD
> db.adminCommand( { addshard: "shard00/delta1:2400,delta2:2400", name: "shard00" } )
> db.adminCommand( { addshard: "shard01/delta1:2401,delta2:2401", name: "shard01" } )
...
> db.adminCommand( { addshard: "shard22/delta6:2422,delta5:2422", name: "shard22" } )
> db.adminCommand( { addshard: "shard23/delta6:2423,delta5:2423", name: "shard23" } )
# SSD
> db.adminCommand( { addshard: "delta5:2424",name: "shard24" } )  
# Enable Sharding
> db.adminCommand( { enablesharding : "mydb" } ) 
> db.adminCommand( { shardcollection : "mydb.log", key : { hour : 1 } } )

"addshard" コマンドによってshardを構成していきます。"name"キーはそのshardの名前を決定し、"addshard"キーでshardの所在をhost名とport名により指定します。上記の例はReplica Setsを各Shardで構成しているためにReplica Setsメンバー全員の所在とSet名を指定しています。例えば"shard00"は"shard00"というSet名を持ったReplica Setsメンバー、delta1:2400とdelta2:2400(と後1つのarbiter server)を指定しています。またこのような24台のHDD上のReplica Setsを持つShardを構成するのに加えて、"shard0024"というReplicationを行っていない、かつSSD上に配置されたShardを登録しています。これは他のShardとは別の目的で用意しています。そして"enablesharding"コマンドによってデータベース"mydb"をSharding対象に加え、"mydb.log"コレクションをShard Key: "hour"で分割することを指定しています。実際に挿入されるレコード例を挙げておきます(一部加工済):

{
        "_id" : ObjectId("4de3d6f573a4bc264f002619"),
        "protocol" : "HTTP/1.1",
        "hour" : 23,
        "referer" : "-",
        "ipaddr" : "000.00.000.000",
        "responseBodySize" : 6052,
        "userId" : "777777",
        "timeMilliSec" : 83396000,
        "options" : {

        },
        "userAgent" : "DoCoMo/2.0 F706i(c100;TB;W24H17)",
        "params" : {
                "count" : "10",
                "opensocial_viewer_id" : "777777",
                "material" : "1",
                "item" : "61",
                "opensocial_owner_id" : "777777",
                "opensocial_app_id" : "10"
        },
        "time" : "23:09:56",
        "date" : 20110530,
        "path" : "/hoge/foo/bar",
        "method" : "GET",
        "statusCode" : "200"
}
メモリやCPUの優秀なマシンを割り降ることで初期段階のデータの局所集中に耐える

Sharding開始直後は1つのChunkからスタートします。これは1つのShardにデータが集中することを意味しています。特に初動から多数のアクセスが見込まれる場合、またはShard間でサーバースペックが異なる場合は、高性能マシンをPrimary Shardに割り当て、高い負荷に備える準備が必要です。Primary Shardの設定は上記のコマンドに続いて以下の様に行います。

> db.adminCommand( { moveprimary :     "mydb", to : "shard24" } );
   { "_id" : "mydb", "partitioned" : true, "primary" : "shard24" }

これによりPrimary Shardを意図的に"shard0024"に移動することができます。この設定を行わない場合は、自動でどこかのShardがPrimaryに設定されています。Primary Shardの設定はデータの挿入後にも行う事ができますが、エラーが出る可能性があること、事前に行うことに多くの意義を持つことに注意しておいて下さい。

Primary ShardはSSD上に配置し、Map Reduceなどの出力を高速に行う

今回の例の場合では、Primary Shard: shard24をこちらの目的で活用しています。shard24は解析専用サーバーのSSD上に配置されています。また、意図的な設定によって、このshard上には普通のデータは入って来ないようにしており、Map Reduceを使用した際の出力先のコレクションとしてのみ活用することにしています。複雑な集計作業においては、複数のMap Reduceの結果を用いてさらにMap Reduceを行うような場合がありますので、今回はその目的で高速な出力と、SSD上で多段MapReduceを高速に行うためのShardとして活用しています。例えば以下の様なUUを求めるMapReduceの結果はshard24上の"mr"コレクションに格納されます:

> m = function() { emit(this.userId, 1); }
> r = function(k,vals) { return 1; }
> res = db.log.mapReduce(m, r, { out : {replace:'mr'} });
{        "result" : "mr",
        "shardCounts" : {
                "shard00/delta1:2400,delta2:2400" : {
                        "input" : 11349174, 
                         "emit" : 11349174,
                         "output" : 167265 },
                "shard01/delta1:2401,delta2:2401" : {
                        "input" : 7582207,
                        "emit" : 7582207,
                        "output" : 139164 },
                ...
                "shard23/delta5:2423,delta6:2423" : {
                        "input" : 7508581,
                        "emit" : 7508581,
                        "output" : 144319
                }
        },
        "counts" : {
                "emit" : NumberLong(174721895),
                "input" : NumberLong(174721895),
                "output" : NumberLong(3317670)
        },
        "ok" : 1,
        "timeMillis" : 681401,
        "timing" : {
                "shards" : 653462,
                "final" : 27938
        },
}

以上の結果の統計情報を見ても"shard24"はデータが存在せずに、集計対象から外れていることがわかります。それではmongosからではなく、shard24上のmongodに直接アクセスして、そこに"mr"コレクションがあるか覗いてみます:

$ mongo delta5:2424/mydb
  MongoDB shell version: 1.8.1
  connecting to: delta5:2424/mydb
> show collections
log
mr                     # mr コレクションが確かにshard24に存在
system.indexes
> db.mr.count()
500000
> db.mr.find() 
{ "_id" : "u100001", "value" : 1 }
{ "_id" : "u100002", "value" : 1 }
{ "_id" : "u100003", "value" : 1 }
has more

大量の出力がある場合や保存されたコレクションからさらに何らかの集計を行う場合には、Primary Shardを利用して特定の用途だけのShardを作るような方法は有効な手段となることでしょう。

2. Chunkの事前分割 〜運用開始からデータを分散させる〜

Primary Shardの話の中で、「初期段階では1つのShardにデータが集中する」問題を挙げましたが、これに対してはより効果的な対策を施す事が出来ます。設定したShardKeyに対してデータを挿入する前にChunkの分割を行ってしまおうというものです。これは事前にShardKeyの分布が予測できている場合には始めから均質なデータの分散を可能にすることができます。今回の例では(そこまで均質性を求める事はできませんが、)入ってくるデータ(ログレコード)が記録された時間(0〜23)をshardKeyにすることで、初動からデータを分散させるように意図しています。この作業はデータが挿入される前に行わなければならないことに注意して下さい。

db.adminCommand( { split : "mydb.log" , middle : { hour:   0 } )
db.adminCommand( { split : "mydb.log" , middle : { hour:   1 } )
...
db.adminCommand( { split : "mydb.log" , middle : { hour:   23 } )

このsplitコマンドを実行する事によって、"middle"で定めたShardKeyの値を中心にしてChunkの分割を行ってくれます。今回はこれを0,..23のmiddle値でsplitコマンドを実行する事によって異なる時間のログレコードは異なるchunkに入るように設定しています。このコマンドを実行した結果を確認してみます:

> db.printShardingStatus()
        { "_id" : "mydb", "partitioned" : true, "primary" : "shard24" }
                mydb.log chunks:
                                shard24       25
                        { "hour" : { $minKey : 1 } } -->> { "hour" : 0 } on : shard0025 { "t" : 1000, "i" : 1 }
                        { "hour" : 0 } -->> { "hour" : 1 } on : shard24 { "t" : 1000, "i" : 3 }
                        { "hour" : 1 } -->> { "hour" : 2 } on : shard24 { "t" : 1000, "i" : 5 }
                        { "hour" : 2 } -->> { "hour" : 3 } on : shard24 { "t" : 1000, "i" : 7 }
                        { "hour" : 3 } -->> { "hour" : 4 } on : shard24 { "t" : 1000, "i" : 9 }
                        ...
                        { "hour" : 22 } -->> { "hour" : 23 } on : shard24 { "t" : 1000, "i" : 47 }
                        { "hour" : 23 } -->> { "hour" : { $maxKey : 1 } } on : shard24 { "t" : 1000, "i" : 48 }

この結果より、Chunkのレンジが (-∞, 0 ), [0, 1 ), [1, 2 ),..., [22, 23 ), [23, ∞ ) に分割されていることが確認できます。ここまででChunkの事前分割は実現しましたが、また異なるshardに分散させるところまでは実現していません。全ての行で"on : shard24"とあるように、全てのChunkは自動Balancing機能が働くまで、依然としてshard24上に集中して存在してしまうことになります。

3. Chunkの手動マイグレーション 〜安全に効率的にChunkを移動する〜

そこで次にChunkを意図的に移動(マイグレーション)させることにします。これを手動で行う事によって初動からデータを分散させることができ、かつChunkを置きたい場所に自由に移動させておくことで、データの意図的なルーティングも可能になります。

db.adminCommand({moveChunk : "mydb.log", find : {hour :  0}, to : "shard00"})
db.adminCommand({moveChunk : "mydb.log", find : {hour :  1}, to : "shard01"})
db.adminCommand({moveChunk : "mydb.log", find : {hour :  2}, to : "shard02"})
...
db.adminCommand({moveChunk : "mydb.log", find : {hour :  22}, to : "shard22"})
db.adminCommand({moveChunk : "mydb.log", find : {hour :  23}, to : "shard23"})

この"moveChunk"コマンドは、findキーで指定したShardKey条件に該当するドキュメントを含むChunkをtoキーで指定したShardに移動させます。今回の例では、Chunk [ 0, 1 ) (つまりhour=0のデータ)をshard00に移動させています。この結果を見てみますと今度は、各Chunkが異なるShardに存在している事が確認できます。

> db.printShardingStatus()
        { "_id" : "mydb", "partitioned" : true, "primary" : "shard0025" }
                mydb.log chunks:
                                shard24       1
                                shard00       1
                                shard01       1
                                shard02       1
                                ...
                                shard22       1
                                shard23       1
                        { "hour" : { $minKey : 1 } } -->> { "hour" : 0 } on : shard24 { "t" : 26000, "i" : 1 }
                        { "hour" : 0 } -->> { "hour" : 1 } on : shard00 { "t" : 3000, "i" : 0 }
                        { "hour" : 1 } -->> { "hour" : 2 } on : shard01 { "t" : 4000, "i" : 0 }
                        { "hour" : 2 } -->> { "hour" : 3 } on : shard02 { "t" : 5000, "i" : 0 }
                        ...
                        { "hour" : 22 } -->> { "hour" : 23 } on : shard22 { "t" : 25000, "i" : 0 }
                        { "hour" : 23 } -->> { "hour" : { $maxKey : 1 } } on : shard23 { "t" : 26000, "i" : 0 }

この設定の元でデータの挿入を行った場合、データのhour値によってそれぞれのShardに始めから分散させて挿入することができます。実際に挿入して確かめてみましょう:

$ mongo delta1:2600/mydb #connect to mongos
> for( var n=0; n<24; n++ ){
...   db.log.insert({"hour": n})
... }

$ mongo delta1:2400/mydb
> db.log.distinct("hour")
[ 0 ]

$ mongo delta1:2401/mydb
> db.log.distinct("hour")
[ 1 ]
...
$ mongo delta1:2423/mydb
> db.log.distinct("hour")
[ 23 ]

意図したとおりの結果が得られている事が確認できました。この設定は非常に効果的ですので、一度検討してみて下さい。

4. Chunkの手動分割&マイグレーション 〜動かないChunkを移動する〜

すでにSharding環境を構築して運用されている場合でも、2.と3.でお話したことは実践が可能です。つまりあるShard内の、既にデータが詰まっているChunkを意図的に分割して軽くし、特定のShardへ移動させることが可能です。これらの作業は次の意味で非常に有用です:

・Shard間の偏りを自力で補正する
・類似のChunkを同じShardへ集合させることでデータローカリティを実現する

shard間の偏りを自力で補正する

MongoDBの自動Balancing機能ではShard間の偏りがどうしても生じてきてしまいます。実際のChunkのマイグレーションには、そのChunkサイズと同じ以上のメモリ領域と、configサーバーへの大きな負担をかけるというリスクによって、必ずしも成功するとは限りません。メモリが足りなければChunkの移動が必要な状況でも決して起こりません。これらの状況は避けることができませんので、手動でのChunkの移動を定期的に行うようにします。また、Chunkサイズが大きかったりメモリが足りなくて移動に失敗する場合は、Chunkの手動分割によってより細かいサイズまで分割してから移動させることにします。まずはShardごとのデータサイズを確認して、Chunkの手動分割・移動を行ってみます:

> printShardingSizes()
        ...
        { "_id" : "mydb", "partitioned" : true, "primary" : "shard24" }
                mydb.log chunks:
                        { "hour" : { $minKey : 1 } } -->> { "hour" : 0 } on : shard24 { "size" : 0, "numObjects" : 0 }
                        { "hour" : 0 } -->> { "hour" : 1 } on : shard00 { "estimate" : false, "size" : 89590388, "numObjects" : 198518  }
                        { "hour" : 1 } -->> { "hour" : 2 } on : shard01 { "estimate" : false, "size" : 75265412, "numObjects" : 165859 }
                        { "hour" : 2 } -->> { "hour" : 3 } on : shard02 { "estimate" : false, "size" : 163306000, "numObjects" : 360413 }
                        ...
                        { "hour" : 22 } -->> { "hour" : 23 } on : shard22 {"estimate" : false, "size" : 254483548, "numObjects" : 576126 }
                        { "hour" : 23 } -->> { "hour" : { $maxKey : 1 } } on : shard23 { "estimate" : false, "size" : 247971184, "numObjects" : 561015 }

このコマンドによって各Shardの各データベース/コレクションのデータサイズ(の推定値)とオブジェクト数を確認することができます。他にも各Shard情報を確認するコマンドがあります。実際の分割・移動は前回までにお話したことと同じですが、分割に関しては、既にデータが存在しているので、対象のChunkをfindキーで特定して等分割を行う事にします(前回はmiddleキーだったことに注意して下さい)。今までの例はhourという24種類しかないキーをShardKeyに設定してしまっているため、分割などをしにくい状態になってしまっています。ここでは00:00:00を0としたtimeMilliSecをShardKeyとしている前提で分割することにします。

> db.adminCommand( { split : "mydb.log" , find : { timeMilliSec : 60000 } } ) 

このコマンドによってtimeMilliSecキー: 60000 を含むChunkが等分割になるようなキーで分割が行われます。Chunkの移動の方は前回お話したコマンドと全く同じです:

db.adminCommand({moveChunk : "mydb.log", find : {timeMillSec :  30000}, to : "shard10"})

これによって分割した片方の、timeMillSec=30000を含む方のChunkをsahrd10に移動させる事ができました。

類似のchunkを同じShardへ集合させることでデータローカリティを実現する

後の解析を見据えた場合のデータストレージにおいては、データの分散を意識しつつ、データローカリティ、つまり類似のデータは同じShardに存在させておいて検索や集計の効率性を上げることも意識しておく必要があります。完全にデータローカリティに特化したストレージを行うには、類似のChunkを特定のShardに集合させるようにします。この場合はShardの偏りはあまり気にしないでしょう。

5. Shardの状態を監視する 〜有用な2つのコマンド〜

さて、最後になりましたが、ここまでに登場したSharding環境で頻繁に使うことになるコマンドを紹介しておきます。

printShardingStatus(undefined,true)

機能的には db.printShardingStatus() と同じなのですが、 db.printShardingStatus() で肝心のChunkの分割情報を表示してくれない状況が発生してきます:

> db.printShardingStatus()
                                ...
                                shard0022       1
                                shard0023       1
                        too many chunksn to print, use verbose if you want to force print #肝心な情報が見れない…orz

その場合に printShardingStatus(undefined,true) を使います:

> printShardingStatus(undefined,true)
                                ...
                                shard0022       1
                                shard0023       1
                        { "hour" : { $minKey : 1 } } -->> { "hour" : 0 } on : shard0025 { "t" : 26000, "i" : 1 }
                        { "hour" : 0 } -->> { "hour" : 1 } on : shard0000 { "t" : 3000, "i" : 0 }
                        { "hour" : 1 } -->> { "hour" : 2 } on : shard0001 { "t" : 4000, "i" : 0 } 
                        ...

また、Chunkの情報はconfigデータベースからも確認することができます:

>use config
> db.chunks.find().forEach(printjson)
...
{
        "_id" : "mydb.log-hour_22.0",
        "lastmod" : {
                "t" : 25000,
                "i" : 0
        },
        "ns" : "mydb.log",
        "min" : {
                "hour" : 22
        },
        "max" : {
                "hour" : 23
        },
        "shard" : "shard22"
}
{
        "_id" : "mydb.log-hour_23.0",
        "lastmod" : {
                "t" : 26000,
                "i" : 0
        },
        "ns" : "mydb.log",
        "min" : {
                "hour" : 23
        },
        "max" : {
                "hour" : { $maxKey : 1 }
        },
        "shard" : "shard23"
}

configデータベースにはその他Shardingに必要なメタ情報を管理していますのでここから様々な情報を取得することが可能です。ただし、このメタ情報をupdateしたりremoveしたりすると大変な事になるので注意して下さい。configサーバーで手を加えて良いのは自動Balancing機能をオフにする場合だけです:

use config
db.settings.update( { _id: "balancer" }, { $set : { stopped: true } } , true )
printShardingSizes()

こちらも既出ですが、各Shardのデータサイズとオブジェクト数をコレクションごとに表示してくれる便利なコマンドです。ただし、このコマンドがデータ数が多くなると返ってくるのが非常に遅くなってしまいます。その場合には他のコマンドで確認するようにしてください。

> printShardingSizes()
Wed Jun  1 00:30:46 updated set (shard00) to: shard00/delta1:2400,delta2:2400
Wed Jun  1 00:30:46 [ReplicaSetMonitorWatcher] starting
...
--- Sharding Status ---
  sharding version: { "_id" : 1, "version" : 3 }
  shards:
      { "_id" : "shard0000", "host" : "shard00/delta1:2400,delta2:2400" }
      { "_id" : "shard0001", "host" : "shard01/delta1:2401,delta2:2401" }
      { "_id" : "shard0002", "host" : "shard02/delta1:2402,delta2:2402" }
      ...
               playshop-gree-access.log chunks:
                        { "hour" : { $minKey : 1 }, "date" : { $minKey : 1 } } -->> { "hour" : 0, "date" : { "$minKey" : 1 } } on : shard0012 { "estimate" : false, "size" : 1854702464, "numObjects" : 3765808 }
                        { "hour" : 0, "date" : { "$minKey" : 1 } } -->> { "hour" : 1, "date" : { "$minKey" : 1 } } on : shard0013 { "estimate" : false, "size" : 1384094100, "numObjects" : 2806727 }
                        { "hour" : 1, "date" : { "$minKey" : 1 } } -->> { "hour" : 2, "date" : { "$minKey" : 1 } } on : shard0014 { "estimate" : false, "size" : 882130544, "numObjects" : 1780488 }
                        { "hour" : 2, "date" : { "$minKey" : 1 } } -->> { "hour" : 3, "date" : { "$minKey" : 1 } } on : shard0015 { "estimate" : false, "size" : 539366856, "numObjects" : 1089989 }
                        { "hour" : 3, "date" : { "$minKey" : 1 } } -->> { "hour" : 4, "date" : { "$minKey" : 1 } } on : shard0016 { "estimate" : false, "size" : 419811240, "numObjects" : 848770 }
                        { "hour" : 4, "date" : { "$minKey" : 1 } } -->> { "hour" : 5, "date" : { "$minKey" : 1 } } on : shard0017 { "estimate" : false, "size" : 726579776, "numObjects" : 1471823 }
                        { "hour" : 5, "date" : { "$minKey" : 1 } } -->> { "hour" : 6, "date" : { "$minKey" : 1 } } on : shard0018 { "estimate" : false, "size" : 1152177872, "numObjects" : 2347418 }
                        { "hour" : 6, "date" : { "$minKey" : 1 } } -->> { "hour" : 7, "date" : { "$minKey" : 1 } } on : shard0019 { "estimate" : false, "size" : 2630525884, "numObjects" : 5485197 }
                        { "hour" : 7, "date" : { "$minKey" : 1 } } -->> { "hour" : 8, "date" : { "$minKey" : 1 } } on : shard0020 { "estimate" : false, "size" : 5387886924, "numObjects" : 11132989 }
                        { "hour" : 8, "date" : { "$minKey" : 1 } } -->> { "hour" : 9, "date" : { "$minKey" : 1 } } on : shard0021 { "estimate" : false, "size" : 2379915544, "numObjects" : 4840048 }
                        { "hour" : 9, "date" : { "$minKey" : 1 } } -->> { "hour" : 10, "date" : { "$minKey" : 1 } } on : shard0022 { "estimate" : false, "size" : 2308302612, "numObjects" : 4689784 }
                        { "hour" : 10, "date" : { "$minKey" : 1 } } -->> { "hour" : 11, "date" : { "$minKey" : 1 } } on : shard0023 { "estimate" : false, "size" : 2558705640, "numObjects" : 5189979 }
                        { "hour" : 11, "date" : { "$minKey" : 1 } } -->> { "hour" : 12, "date" : { "$minKey" : 1 } } on : shard0000 { "estimate" : false, "size" : 1881322140, "numObjects" : 3818884 }
                        { "hour" : 12, "date" : { "$minKey" : 1 } } -->> { "hour" : 13, "date" : { "$minKey" : 1 } } on : shard0001 { "estimate" : false, "size" : 2264673264, "numObjects" : 4605082 }
                        { "hour" : 13, "date" : { "$minKey" : 1 } } -->> { "hour" : 14, "date" : { "$minKey" : 1 } } on : shard0002 { "estimate" : false, "size" : 2303920996, "numObjects" : 4686648 }
                        { "hour" : 14, "date" : { "$minKey" : 1 } } -->> { "hour" : 15, "date" : { "$minKey" : 1 } } on : shard0003 { "estimate" : false, "size" : 3298773252, "numObjects" : 6840413 }
                        { "hour" : 15, "date" : { "$minKey" : 1 } } -->> { "hour" : 16, "date" : { "$minKey" : 1 } } on : shard0004 { "estimate" : false, "size" : 3157248456, "numObjects" : 6542235 }
                        { "hour" : 16, "date" : { "$minKey" : 1 } } -->> { "hour" : 17, "date" : { "$minKey" : 1 } } on : shard0005 { "estimate" : false, "size" : 2521432232, "numObjects" : 5130008 }
                        { "hour" : 17, "date" : { "$minKey" : 1 } } -->> { "hour" : 18, "date" : { "$minKey" : 1 } } on : shard0006 { "estimate" : false, "size" : 2551743308, "numObjects" : 5197201 }
                        { "hour" : 18, "date" : { "$minKey" : 1 } } -->> { "hour" : 19, "date" : { "$minKey" : 1 } } on : shard0007 { "estimate" : false, "size" : 1840064336, "numObjects" : 3695385 }
                        { "hour" : 19, "date" : { "$minKey" : 1 } } -->> { "hour" : 20, "date" : { "$minKey" : 1 } } on : shard0008 { "estimate" : false, "size" : 2250083536, "numObjects" : 4526531 }
                        { "hour" : 20, "date" : { "$minKey" : 1 } } -->> { "hour" : 21, "date" : { "$minKey" : 1 } } on : shard0009 { "estimate" : false, "size" : 2556930488, "numObjects" : 5139566 }
                        { "hour" : 21, "date" : { "$minKey" : 1 } } -->> { "hour" : 22, "date" : { "$minKey" : 1 } } on : shard0010 { "estimate" : false, "size" : 2567556160, "numObjects" : 5295961 }
                        { "hour" : 22, "date" : { "$minKey" : 1 } } -->> { "hour" : 23, "date" : { "$minKey" : 1 } } on : shard0011 { "estimate" : false, "size" : 2323384656, "numObjects" : 4784284 }
                        { "hour" : 23, "date" : { "$minKey" : 1 } } -->> { "hour" : { $maxKey : 1 }, "date" : { $maxKey : 1 } } on : shard0023 { "estimate" : false, "size" : 0, "numObjects" : 0 }
                        ...

まとめ

いかがでしたでしょうか?今回は内容がマニアックすぎたので何を言っているのかわからなかった方も多いと思います。実際にSharding環境を構築されて、同じような問題や必要に迫られたときに、もう一度この記事を思い返して頂ければ幸いです。

弊社の新解析基盤も今日お話した設定を反映しながら、26Shard+Replica Sets、データサイズ18TB×2 の規模で運用を開始しています。ここに述べたことだけでなく、自動Shardingや自動Balancingを行わないマニュアルのChunk分割ルールやShardの世代管理、SlaveからもMap Reduceを同時に行ったりHadoopやRedisと連携する集計方法など、色々やっています。またその辺の話も安定して運用できるようになればお話できればと思っています。

今後ともMongoDBを宜しくお願いします。