MapReduce処理の概要とその実装手順
前回記事の続きです。(内容を少し書き直しました。)
まずHadoopのお勉強から。HadoopはMap(割り当て)、Shuffle(並び替え)、Reduce(足し合わせる)ことだと思っています。
例えば、下記のようなログファイルがあるとします。(こんな簡単なアプリケーションログは普通存在しませんが....)
--------
2013-02-01 20:26:41 SampleClass1 [TRACE]
2013-02-01 20:26:41 SampleClass2 [TRACE]
2013-02-01 20:26:41 SampleClass3 [DEBUG]
2013-02-01 20:26:41 SampleClass4 [TRACE]
2013-02-01 20:26:41 SampleClass5 [INFO]
2013-02-01 20:32:47 SampleClass1 [TRACE]
2013-02-01 20:32:47 SampleClass2 [DEBUG]
2013-02-01 20:32:47 SampleClass3 [TRACE]
2013-02-01 20:32:47 SampleClass4 [TRACE]
2013-02-01 20:32:47 SampleClass5 [DEBUG]
2013-02-01 20:32:47 SampleClass6 [TRACE]
2013-02-01 20:32:47 SampleClass1 [TRACE]
2013-02-01 20:32:47 SampleClass2 [DEBUG]
2013-02-01 21:05:21 SampleClass3 [FATAL]
--------
このファイルの[TRACE] [DEBUG] [INFO] [WARN] [ERROR] [FATAL] それぞれのカウントを得るために、MapReduceを使用してみます。
- Map処理を行います。それぞれの値に対して『1』を割付ます。
[TRACE]:1[TRACE]:1[DEBUG]:1[TRACE]:1[INFO] :1[TRACE]:1[DEBUG]:1[TRACE]:1[TRACE]:1[DEBUG]:1[TRACE]:1[TRACE]:1[DEBUG]:1[FATAL]:1 - Shuffle処理を行います。同じ値でソートします。
[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[DEBUG]:1[DEBUG]:1[DEBUG]:1[DEBUG]:1[INFO] :1[FATAL]:1 - Reduce処理を行います。(同一キーを足し合わせる。)
[TRACE]:8[DEBUG]:4[INFO]:1[FATAL]:1
これをHadoopSDKを使用してMapReduceを行う手順は下記の通りです。
- Visual Studioでコンソールアプリケーションプロジェクトを作成する
- Install-Package Microsoft.Hadoop.Mapreduceを実行する。
- MapperBaseを継承したクラスを作成する。ソースコードは下記の通り。
using Microsoft.Hadoop.MapReduce;
namespace test
{
public class LogMapper : MapperBase
{
//MapperBaseのMapメソッドをオーバーライドする。
public override void Map(string inputLine, MapperContext context)
{
//[TRACE]などを取得する。
var sa = inputLine.Split(' ');
string temp = sa[3];
//取得した文字列に対して、1を割り当てる。
context.EmitKeyValue(temp, "1");
}
}
} - ReducerCombinerBaseを継承したクラスを作成する。ソースコードは下記の通り。
using Microsoft.Hadoop.MapReduce;
using System.Collections.Generic;
using System.Linq;
namespace test
{
public class LogReducer : ReducerCombinerBase
{
//ReducerCombinerBaseのReduceメソッドをオーバーライドする。
public override void Reduce(string key, IEnumerable<string> values,
ReducerCombinerContext context)
{
//各キーを並びかえ、個数を合計する。
context.EmitKeyValue(key, values.Count().ToString());
}
}
} - テストコードを作成します。StreamingUnitというクラスがあり、HadoopにJob展開しなくても、MapReduceのテストが可能です。(これは便利だなぁ。)
using Microsoft.Hadoop.MapReduce;
using System;
using System.Collections;
namespace test
{
class Program
{
static void Main(string args)
{
string line = null;
//テストデータを読み込みます。
var sr = new System.IO.StreamReader(@"C:\Users\ryuchan\Documents\text.txt");
var al = new ArrayList();
while((line = sr.ReadLine()) != null) al.Add (line);
sr.Close();
var inputData = (string)al.ToArray(typeof(string));
//MapReduce処理を実行します。
var outputData = StreamingUnit.Execute<LogMapper, LogReducer>(inputData);
Console.ReadLine();
}
}
}
これで、MapReduceの一連の動作はテストすることが可能です。Hadoopの設定がうまく行かなくて、中々動作が確認できませんでしたが、StreamingUnitのおかげてテストが楽になりました。あとは、HadoopへのJob発行、HDFSへのテストデータのロード、Job実行ですね。次回に続く。