photo credit: CoreBurn

twitterが先日stormというリアルタイムな分散処理フレームワークstormを公開しました。
(ここでいうリアルタイムとは、すぐに、とか連続的に、というニュアンスで使っています) Stormは、連続的に来るデータに対して同じ処理を繰り返しかけて新たなストリームを作る、という目的のフレームワークです。
分散やメッセージの保証をフレームワークに任せて、SpoutとBoltという処理を書くだけでシステムを作ることができます。 updateも頻繁に行われていて、user-groupも活発なので、これから利用者が増えていくかもしれません。

分散処理フレームワークというとHadoopが有名ですが、Hadoopではまとまったデータを一気に処理するバッチ処理を、簡単に分散でき高速に処理することができるフレームワークでした。 一方Stormでは、連続的に到達するデータに対して、連続的に結果を出すような処理を、簡単に分散させて処理することができるフレームワークになります。 YahooのS4などが近いです。

もともとはtwitterが買収したbacktypeの技術になります。 twitterでは、例えば連続的発生するtweetをstream APIから読み込んで、現在のtrend topicを出す、というような処理に使われているようです。
2011/09/19に行われたstrange loopというイベントで発表して、オープンソースとして公開されました。
2011/09/19の公開時には ver.0.5.4でしたが、現在はver.0.6.2までアップデートされています。(2012/1/31現在)

関連する日本語の記事:

開発者向けの情報

関連しそうなスライド

Stormの概要をつかむには以下のスライドが分かりやすいと思います。(英語です)

※2012/3/25 追記
ちょっと前に日本でもstormに関する勉強会があったので、そこで使われたスライドも紹介します。

Stormとはどのようなものか

stormでの処理はどのようなものか、どのように書けるのか、先程のwikiや以下のサイトに記述があるので、そこから簡単に紹介します。
Twitter Engineering: A Storm is coming: more details and plans for release

自分の理解は間違っている可能性があるので、上のサイトやスライドなどを読むことをおすすめします。 また、間違いがあれば教えていただけると幸いです。

Stormでは何ができるか?

  • 1,stream processing
  • データストリームを処理して、データベースを更新するような処理。 キューとワーカーでやっていた処理を、耐故障性を持ちながらスケーラブルに行うことができます。
  • 2,Continuout computation
  • continuousなクエリを行い、その結果をリアルタイムにクライアントにstreamすることができます。 例えば、twitterののストリームからトレンドトピックのストリームを生成し続けるようなものです。
  • 3, Distributed RPC
  • 動作中のStormに対して、任意のクエリ投げて並行して処理させることも可能です。 Stormはクエリメッセージを待ち受け、クライアントにその結果を返します。

stormの処理

Stormの構成はHadoopに似ています。HadoopにおけるMapreduce jobに対応するのが、Stormでは Topology になります。TopologiesはMapReduce job と大きく異なるが、一番の違いは、topologiesには終わりがないということ(ユーザが止めるまで動き続ける)。

Stormのノードにはmasterノードとworkerノードがあり、masterノードは Nimbus と呼ばれます(hadoopにおけるjob tracker)。

Nimbusはworkerへタスクを割り振り、workerのモニタリングを行います。
それぞれのworkerノードでは、Supervisorというデーモンを動いています。SupervisorはNimbusからworkがアサインされるのを待ち受け、workerプロセスのスタート、ストップを行います。それぞれのworkerプロセスはTopologyのサブセットを実行する。

NimbusとSupervisor間の調停にはzookeeper を利用します。
NimbusとSupervisorのデーモンはステートレスで、ステートはzookeeperやローカルディスクに保持されます。
そのため、workerプロセスをkill -9 しても自動的に立ち上がり動作することができます。これにより、stormクラスタはstableとなっています。

Stream と Topologies

Streamとは途切れずに連続するタプルのことで、例えば連続的なtweetのようなものです。
Stormはstreamを、新たなストリームへ、分散して信頼できる方法で変換します。例えばtweetのstreamを、トレンドトピックのストリームへ変換します。

streamの変換のために、stormはSpoutとBoltという2つの機能を提供します。

spoutはストリームの源であり、spoutからStormの処理が開始されます。
spoutの役割は、例えばキューからタプルを読み込んでstreamを生成したり、twitterAPI からtweetのstreamを作成したりする処理になります。

Boltは1回のstream変換処理になります。
複雑な変換(tweetsのstreamからtrend topicのstreamを作成する処理のように) の場合、複数のBoltでその処理を実現することになります。 Spoutと、Boltで構成される複数段階のstream変換は、Topologyというpackageにまとめられる。そのtopologyをstormのクラスタに渡すことで処理されます。

Topologyはstreamの変化のグラフであり、それぞれのノードがspoutかboltとなっている。図の矢印は、どのBoltがどのstreamを使うか(subscribe)を示してます。
stormではこれらの処理が分散して平行に行われます。
tupleは、中間的な状態にはならず、tupleを生成したthreadから、そのtupleを利用するthreadへ直接送られます。

次は、wordcountの例を用いて説明します。

Wordcountの例

Wordcountを行うtopologyに対する入力は、複数の単語からなる文章です。stormは、その文章に含まれる単語をそれぞれカウントして、出力します。
上の図でいうと、KestrelSpoutが、Kestrelというキューから、データを取得して、文章を次のboltへ出力します。 次に、SplitSentenceBoltが、入力された文章を個々の単語に分け、単語1つを1つのtupleとして出力します。 最後にWordCountBoltが、入力された単語の数を数えていき、その単語とその数を出力します。

このような処理を行うtopologyは、以下のように書くことができます。

Topologyの定義

TopologyBuilder builder = new TopologyBuilder();  
builder.setSpout("MySpout", new KestrelSpout("kestrel.backtype.com",  
                                      22133,
                                      "sentence_queue",
                                      new StringScheme()));

builder.setBolt("SplitBolt", new SplitSentence(), 10)  
  .shuffleGrouping("MySpout");

builder.setBolt("WordCountBolt", new WordCount(), 20)  
  .fieldsGrouping("SplitBolt", new Fields("word"));

spoutはtopologyにsetSpoutメソッドからユニークなID(上の例では"MySpout")を割り当てられて追加されます。
topologyの全てのノード(spoutやbolt)は必ずIDが割り当てられ、そのIDを用いてboltからoutput streamがsubscribeされます。
boltの追加はsetBoltを用います。

Topologyにおける個々のboltは以下のように書けます。
Boltの例(SplitSentencceBolt)
文章を単語に分割するboltです。

public class SplitSentence implements IBasicBolt {

  public void prepare(Map conf, TopologyContext context) {
  }

  public void execute(Tuple tuple, BasicOutputCollector collector) {
    String sentence = tuple.getString(0);
    for(String word: sentence.split(" ")) {
      collector.emit(new Values(word));
    }
  }

  public void cleanup() {
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

これは、文章を入力として、それぞれの単語をtupleとして出力していくBoltの例になります。 executeメソッドで、タプルを受け取って、タプルを生成する処理を行なっています。

また、boltは他の言語でも書くこともできます。 pythonの例

import storm

class SplitSentenceBolt(storm.BasicBolt):  
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

ToplogyにおけるsetBoltメソッドのの最後の引数は、boltの処理をどれだけ並列するかを示します。
上の例に出てきたsplitsentenceでは並列数が10なので、クラスタ内で10のthreadで平行して処理されます。 topologyをスケールしたい場合、 ボトルネックとなる処理の、この値を大きくすればよい。

setBoltメソッドは、宣言したinputのオブジェクトを返します。
上記の例のSplitSentence boltは、id"MySpout"のoutput streamを、shuffle groupingを用いてsubscribeしています。 この場合、id="MySpout"とは、KestrelSpoutであるので、kestrelSpoutを参照します。 ポイントは、KestrelSpoutから出力される全てのtupleを、splitsentenctBoltが消費する、ということです。

boltは複数のinput streamをsubscribeできるので、下のようにすれば複数のstreamを合わせるような処理ができます。

builder.setBolt(4, new MyBolt(), 12)  
    .shuffleGrouping(1)
    .shuffleGrouping(2)
    .fieldsGrouping(3, new Fields("id1", "id2"));

この後にもWordCountの説明や複数のstream処理などの説明もありますが省略します。

Stormがやってくれること

  • メッセージの処理の保証
  • ロバストな処理の管理
  • 故障検知と再割当て
  • 効率的なmessage passing
  • Local ModeとDistributed Modeを提供

メッセージ処理をどのように保証するか、は以下のページに詳しく書いてあります。

故障検知については以下のページに書いてあります。

Stormに似たその他の技術

StormはCEP("Complex Event Processing” systems)です。
そのようなシステムは、他にもEsperやS4(yahooが作ったやつ)があります。 S4と似ているが、S4より優れているところとして、Stormでは故障があってもメッセージの処理が保証されるとのこと。

まとめ

Stormは、連続的に来るデータに対して同じ処理を繰り返しかけて新たなストリームを作る、という目的のフレームワークです。
分散などをフレームワークに任せて、SpoutとBoltという処理を書くだけでシステムを作ることができます。 updateも頻繁に行われていて、これから利用者が増えていくかもしれません。

ここに書いていないことも、上で紹介したサイトに行けばいろいろ書いてあります。 AWS向けのパッケージなどもあります。
興味がある人は是非読んでみてください。