tands_bの日記

技術メモ。大したことは書きません

【MapReduce】基礎

MapReduce概要

 MapReduceとは並列分散処理フレームワークの一つである。分散処理は大きく2つのフェーズに別れており、入力データからKeyとValueのペアを生成するMapフェーズ、Keyに対して条件を指定してValueを絞り込んで抽出するReduceフェーズが存在する。Map処理とReduce処理はコーディングする必要がある。

MapReduce処理フロー

# フェーズ 処理概要 出力例 (最も遅い月を抽出)
1 入力 テキストファイルなどからデータをインプットする。 2017,10,... / 2017,12,...
2 map インプットされたデータからKeyとValueのペアを生成する。 (2017, 10) / (2017, 12)
3 シャッフル 同一Keyを持つペアを一纏めにする。 (2017, [10,12])
4 Reduce 指定した条件でValueを抽出する。 (2017,12)
5 出力 MapReduce処理の結果を出力する。 2017, 12

MapReduceデータフロー

ジョブとタスク

 MapReduceの処理全体のことをジョブという。ジョブは2つのタスクから構成されており、mapタスクとreduceタスクが存在する。mapタスクとreduceタスクは 1..n : 1 の関係になっているが、reducerを複数用意することは可能。reducerが複数存在する場合、mapタスクの出力はパーティション化される。

■ reducerが単一のデータフロー図
f:id:tands_b:20171012004839p:plain

■ reducerが複数のデータフロー図
f:id:tands_b:20171012005015p:plain

スプリット

 MapReduceの入力はスプリットに分割され、各スプリットごとにmapタスクが生成される。スプリットという粒度の細かい単位に分割することで、稼働マシンの性能に合わせたロードバランスが行いやすくなる。ただし、粒度を細かくしすぎるとタスク生成のオーバヘッドがかかり過ぎてしまうため注意が必要。

ジョブプロセス制御

 MapReduceのジョブプロセスを制御するノードには2種類存在し、1つだけ存在するjobtrackerと、複数存在するtasktrackerがある。tasktrackerはタスクを実行して進捗をjobtrackerに知らせており、jobtrackerは進捗を元にタスクの振り分けなどtasktrackerの管理を行なっている。

集約関数

 mapとreduce間の通信データ量を減らして最適化するために、mapの出力に対して集約関数を適用することができる。例えば最も数の大きい値を出力したい場合、mapの結果に対してSUM関数のような処理を指定してやることで、各mapタスクからのデータ量を削減することが可能になる。

Javaを用いた開発

事前準備

 Mavenを使用する方法もあるようだが、今回はHadoopライブラリを使用する(Mavenは勉強不足)。Maven Repositoyから「Apache Hadoop Common」と「Hadoop Core」ライブラリをダウンロードし、プロジェクトのビルドパスに追加する。

ダウンロードURL
https://mvnrepository.com/artifact/org.apache.hadoop

Map処理

 データ(2017,10,...)をインプットした際に年をKey、月をValueとしたペアを作成するMap処理。

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapSample extends Mapper<LongWritable, Text, Text, IntWritable>{

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        List<String> values = Arrays.asList(line.split(","));
        if (values.size() < 2) {
            throw new IllegalArgumentException("The number of value is less than 2.");
        }

        String year = values.get(0);
        int month = 0;
        if (  values.get(1) != null && !values.get(1).isEmpty()) {
            month = Integer.valueOf(values.get(1));
        }

        if (year != null && !year.isEmpty() && month >= 1 && month <= 12) {
            context.write(new Text(year), new IntWritable(month));
        }
    }

}

Reduce処理

 シャッフル処理でKeyごとに配列としてまとめられたValueから、最も数の大きい月を抽出するReduce処理。

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReduceSample extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int maxMonth = 0;
        for (IntWritable value : values) {
            maxMonth = Math.max(maxMonth, value.get());
        }
        
        context.write(key, new IntWritable(maxMonth));
    }
    
}

Job実行処理

 各フェーズを管理するJobクラスを用いてMapReduceを実行するJob実行処理。

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.Text;
import com.tands.mapreduce.MapSample;
import com.tands.mapreduce.ReduceSample;

public class JobRunner {

    private static final String JOB_NAME = "MapReduce Sample";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        if (args.length < 2) {
            throw new IllegalArgumentException("Please enter input path and output path.");
        }

        String inputPath = args[0];
        String outputPath = args[1];

        Job job = createJob(inputPath, outputPath);

        // Job実行。引数はverbose設定の有無
        boolean success = job.waitForCompletion(false);
        if (success) {
            System.out.println("Job Success!");
        } else {
            System.out.println("Job Failed");
        }
    }

    private static Job createJob(String inputPath, String outputPath) throws IOException {
        Job job = new Job();

        // Hadoopクラスタを使用してジョブを実行する場合、コードをJARファイルにパッケージ化する
        // setJarByClass()を使用することで、Hadoopがクラスの含まれたJARを見つけ出してくれる
        job.setJarByClass(JobRunner.class);
        job.setJobName(JobRunner.JOB_NAME);

        // データの入出力パスを設定する
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // Map, Reduceクラスを設定する
        job.setMapperClass(MapSample.class);
        job.setReducerClass(ReduceSample.class);

        // OutputのKey, Valueの型を指定
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        return job;
    }

}

集約関数の指定

 mapの出力結果に対して集約関数を適用したい場合、Job作成時にセットする必要がある。

       // 集約関数としてReducerクラスをセットする
        job.setCombinerClass(ReduceSample.class);

作成モジュールの実行

 Hadoopをインストールし、作成したJavaプロジェクトをJARファイル化しておく。以下のコマンドを用いてMapReduce Jobを実行する。

$ hadoop jar <JARファイル> <パッケージ名>.<クラス名> <inputパス> <outputパス>