【MapReduce】基礎
MapReduce概要
MapReduceとは並列分散処理フレームワークの一つである。分散処理は大きく2つのフェーズに別れており、入力データからKeyとValueのペアを生成するMapフェーズ、Keyに対して条件を指定してValueを絞り込んで抽出するReduceフェーズが存在する。Map処理とReduce処理はコーディングする必要がある。
MapReduce処理フロー
# | フェーズ | 処理概要 | 出力例 (最も遅い月を抽出) |
---|---|---|---|
1 | 入力 | テキストファイルなどからデータをインプットする。 | 2017,10,... / 2017,12,... |
2 | map | インプットされたデータからKeyとValueのペアを生成する。 | (2017, 10) / (2017, 12) |
3 | シャッフル | 同一Keyを持つペアを一纏めにする。 | (2017, [10,12]) |
4 | Reduce | 指定した条件でValueを抽出する。 | (2017,12) |
5 | 出力 | MapReduce処理の結果を出力する。 | 2017, 12 |
MapReduceデータフロー
ジョブとタスク
MapReduceの処理全体のことをジョブという。ジョブは2つのタスクから構成されており、mapタスクとreduceタスクが存在する。mapタスクとreduceタスクは 1..n : 1 の関係になっているが、reducerを複数用意することは可能。reducerが複数存在する場合、mapタスクの出力はパーティション化される。
■ reducerが単一のデータフロー図
■ reducerが複数のデータフロー図
スプリット
MapReduceの入力はスプリットに分割され、各スプリットごとにmapタスクが生成される。スプリットという粒度の細かい単位に分割することで、稼働マシンの性能に合わせたロードバランスが行いやすくなる。ただし、粒度を細かくしすぎるとタスク生成のオーバヘッドがかかり過ぎてしまうため注意が必要。
ジョブプロセス制御
MapReduceのジョブプロセスを制御するノードには2種類存在し、1つだけ存在するjobtrackerと、複数存在するtasktrackerがある。tasktrackerはタスクを実行して進捗をjobtrackerに知らせており、jobtrackerは進捗を元にタスクの振り分けなどtasktrackerの管理を行なっている。
集約関数
mapとreduce間の通信データ量を減らして最適化するために、mapの出力に対して集約関数を適用することができる。例えば最も数の大きい値を出力したい場合、mapの結果に対してSUM関数のような処理を指定してやることで、各mapタスクからのデータ量を削減することが可能になる。
Javaを用いた開発
事前準備
Mavenを使用する方法もあるようだが、今回はHadoopライブラリを使用する(Mavenは勉強不足)。Maven Repositoyから「Apache Hadoop Common」と「Hadoop Core」ライブラリをダウンロードし、プロジェクトのビルドパスに追加する。
ダウンロードURL
https://mvnrepository.com/artifact/org.apache.hadoop
Map処理
データ(2017,10,...)をインプットした際に年をKey、月をValueとしたペアを作成するMap処理。
import java.io.IOException; import java.util.Arrays; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapSample extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); List<String> values = Arrays.asList(line.split(",")); if (values.size() < 2) { throw new IllegalArgumentException("The number of value is less than 2."); } String year = values.get(0); int month = 0; if ( values.get(1) != null && !values.get(1).isEmpty()) { month = Integer.valueOf(values.get(1)); } if (year != null && !year.isEmpty() && month >= 1 && month <= 12) { context.write(new Text(year), new IntWritable(month)); } } }
Reduce処理
シャッフル処理でKeyごとに配列としてまとめられたValueから、最も数の大きい月を抽出するReduce処理。
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ReduceSample extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxMonth = 0; for (IntWritable value : values) { maxMonth = Math.max(maxMonth, value.get()); } context.write(key, new IntWritable(maxMonth)); } }
Job実行処理
各フェーズを管理するJobクラスを用いてMapReduceを実行するJob実行処理。
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.Text; import com.tands.mapreduce.MapSample; import com.tands.mapreduce.ReduceSample; public class JobRunner { private static final String JOB_NAME = "MapReduce Sample"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length < 2) { throw new IllegalArgumentException("Please enter input path and output path."); } String inputPath = args[0]; String outputPath = args[1]; Job job = createJob(inputPath, outputPath); // Job実行。引数はverbose設定の有無 boolean success = job.waitForCompletion(false); if (success) { System.out.println("Job Success!"); } else { System.out.println("Job Failed"); } } private static Job createJob(String inputPath, String outputPath) throws IOException { Job job = new Job(); // Hadoopクラスタを使用してジョブを実行する場合、コードをJARファイルにパッケージ化する // setJarByClass()を使用することで、Hadoopがクラスの含まれたJARを見つけ出してくれる job.setJarByClass(JobRunner.class); job.setJobName(JobRunner.JOB_NAME); // データの入出力パスを設定する FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Map, Reduceクラスを設定する job.setMapperClass(MapSample.class); job.setReducerClass(ReduceSample.class); // OutputのKey, Valueの型を指定 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job; } }
集約関数の指定
mapの出力結果に対して集約関数を適用したい場合、Job作成時にセットする必要がある。
// 集約関数としてReducerクラスをセットする job.setCombinerClass(ReduceSample.class);
作成モジュールの実行
Hadoopをインストールし、作成したJavaプロジェクトをJARファイル化しておく。以下のコマンドを用いてMapReduce Jobを実行する。
$ hadoop jar <JARファイル> <パッケージ名>.<クラス名> <inputパス> <outputパス>