都内で働くSEの技術的なひとりごと / Technical soliloquy of System Engineer working in Tokyo

都内でサラリーマンやってます。SQL Server を中心とした (2023年からは Azure も。) マイクロソフト系(たまに、OSS系などマイクロソフト以外の技術も...)の技術的なことについて書いています。日々の仕事の中で、気になったことを技術要素関係なく気まぐれに選んでいるので記事内容は開発言語、インフラ等ばらばらです。なお、当ブログで発信、発言は私個人のものであり、所属する組織、企業、団体等とは何のかかわりもございません。ブログの内容もきちんと検証して使用してください。英語の勉強のため、英語の

MapReduce処理の概要とその実装手順

前回記事の続きです。(内容を少し書き直しました。)

まずHadoopのお勉強から。HadoopはMap(割り当て)、Shuffle(並び替え)、Reduce(足し合わせる)ことだと思っています。

例えば、下記のようなログファイルがあるとします。(こんな簡単なアプリケーションログは普通存在しませんが....)

--------

2013-02-01 20:26:41 SampleClass1 [TRACE]
2013-02-01 20:26:41 SampleClass2 [TRACE]
2013-02-01 20:26:41 SampleClass3 [DEBUG]
2013-02-01 20:26:41 SampleClass4 [TRACE]
2013-02-01 20:26:41 SampleClass5 [INFO]
2013-02-01 20:32:47 SampleClass1 [TRACE]
2013-02-01 20:32:47 SampleClass2 [DEBUG]
2013-02-01 20:32:47 SampleClass3 [TRACE]
2013-02-01 20:32:47 SampleClass4 [TRACE]
2013-02-01 20:32:47 SampleClass5 [DEBUG]
2013-02-01 20:32:47 SampleClass6 [TRACE]
2013-02-01 20:32:47 SampleClass1 [TRACE]
2013-02-01 20:32:47 SampleClass2 [DEBUG]
2013-02-01 21:05:21 SampleClass3 [FATAL]

--------

このファイルの[TRACE] [DEBUG] [INFO] [WARN] [ERROR] [FATAL] それぞれのカウントを得るために、MapReduceを使用してみます。

  1. Map処理を行います。それぞれの値に対して『1』を割付ます。
    [TRACE]:1[TRACE]:1[DEBUG]:1[TRACE]:1[INFO] :1[TRACE]:1[DEBUG]:1[TRACE]:1[TRACE]:1[DEBUG]:1[TRACE]:1[TRACE]:1[DEBUG]:1[FATAL]:1
  2. Shuffle処理を行います。同じ値でソートします。
    [TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[TRACE]:1[DEBUG]:1[DEBUG]:1[DEBUG]:1[DEBUG]:1[INFO] :1[FATAL]:1
  3. Reduce処理を行います。(同一キーを足し合わせる。)
    [TRACE]:8[DEBUG]:4[INFO]:1[FATAL]:1

これをHadoopSDKを使用してMapReduceを行う手順は下記の通りです。

  1. Visual Studioでコンソールアプリケーションプロジェクトを作成する
  2. Install-Package Microsoft.Hadoop.Mapreduceを実行する。
  3. MapperBaseを継承したクラスを作成する。ソースコードは下記の通り。
    using Microsoft.Hadoop.MapReduce;
    namespace test
    {
        public class LogMapper : MapperBase
        {
            //MapperBaseのMapメソッドをオーバーライドする。
            public override void Map(string inputLine, MapperContext context)
            {
                //[TRACE]などを取得する。
                var sa = inputLine.Split(' ');
                string temp = sa[3];
                //取得した文字列に対して、1を割り当てる。
                context.EmitKeyValue(temp, "1");
            }
        }
    }
  4. ReducerCombinerBaseを継承したクラスを作成する。ソースコードは下記の通り。
    using Microsoft.Hadoop.MapReduce;
    using System.Collections.Generic;
    using System.Linq;
    namespace test
    {
        public class LogReducer : ReducerCombinerBase
        {
            //ReducerCombinerBaseのReduceメソッドをオーバーライドする。
            public override void Reduce(string key, IEnumerable<string> values,
                ReducerCombinerContext context)
            {
                //各キーを並びかえ、個数を合計する。
                context.EmitKeyValue(key, values.Count().ToString());
            }
        }
    }
  5. テストコードを作成します。StreamingUnitというクラスがあり、HadoopにJob展開しなくても、MapReduceのテストが可能です。(これは便利だなぁ。)
    using Microsoft.Hadoop.MapReduce;
    using System;
    using System.Collections;
    namespace test
    {
        class Program
        {
            static void Main(string args)
            {
                string line = null;
                //テストデータを読み込みます。
                var sr = new System.IO.StreamReader(@"C:\Users\ryuchan\Documents\text.txt");
                var al = new ArrayList();
                while((line = sr.ReadLine()) != null) al.Add (line);
                sr.Close();
                var inputData = (string
    )al.ToArray(typeof(string));
                //MapReduce処理を実行します。
                var outputData = StreamingUnit.Execute<LogMapper, LogReducer>(inputData);
                Console.ReadLine();
             }
        }
    }

これで、MapReduceの一連の動作はテストすることが可能です。Hadoopの設定がうまく行かなくて、中々動作が確認できませんでしたが、StreamingUnitのおかげてテストが楽になりました。あとは、HadoopへのJob発行、HDFSへのテストデータのロード、Job実行ですね。次回に続く。