MapReduce 是 Hadoop 生态系统中的核心计算框架,采用"分而治之"的思想,将大规模数据处理任务分解为可并行执行的 Map 和 Reduce 两个阶段。本文将详细介绍 MapReduce 的完整执行流程。
MapReduce 将计算过程抽象为两个核心函数:
| 阶段 | 说明 | 输入输出 |
|---|---|---|
| Map | 映射阶段,负责数据的分片处理和初步转换 | (K1, V1) → list(K2, V2) |
| Reduce | 归约阶段,负责数据的汇总聚合 | (K2, list(V2)) → list(K3, V3) |
┌─────────────────────────────────────────────────────────────────────────────────┐ │ MapReduce 完整执行流程 │ └─────────────────────────────────────────────────────────────────────────────────┘ ┌──────────┐ │ 输入文件 │ HDFS 上的数据文件 └────┬─────┘ │ ▼ ┌──────────┐ │ InputFormat │ 决定如何分片和读取数据 └────┬─────┘ │ ▼ ┌──────────┐ │ Split │ 逻辑分片(默认与 Block 大小一致,128MB) └────┬─────┘ │ ▼ ┌──────────┐ │RecordReader│ 将分片数据解析为 Key-Value 对 └────┬─────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────┐ │ MAP 阶段 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Mapper │ │ Mapper │ │ Mapper │ │ Mapper │ ... │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │Partitioner│ │Partitioner│ │Partitioner│ │Partitioner│ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Sort │ │ Sort │ │ Sort │ │ Sort │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Combiner │ │ Combiner │ │ Combiner │ │ Combiner │ (可选) │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Map 输出写入本地磁盘 │ │ │ └──────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────┐ │ SHUFFLE 阶段 │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ 数据通过网络从 Map 端拷贝到 Reduce 端 │ │ │ │ (按 Partition 分配到对应 Reducer) │ │ │ └──────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────┐ │ REDUCE 阶段 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Merge │ │ Merge │ │ Merge │ 合并来自多个 Mapper 的数据 │ │ │ & Sort │ │ & Sort │ │ & Sort │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Reducer │ │ Reducer │ │ Reducer │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ OutputFormat │ │ │ └──────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────┐ │ 输出文件 │ 写入 HDFS └──────────┘
| 组件 | 作用 | 核心方法/说明 |
|---|---|---|
| InputFormat | 定义数据读取方式和分片策略 | getSplits()、createRecordReader() |
| RecordReader | 将分片数据解析为 K-V 对 | nextKeyValue()、getCurrentKey()、getCurrentValue() |
| Mapper | 处理输入的每条记录 | map() |
| Partitioner | 决定 Map 输出发送到哪个 Reducer | getPartition() |
| Combiner | Map 端本地聚合(可选) | 复用 Reducer 逻辑 |
| Reducer | 聚合处理相同 Key 的所有 Value | reduce() |
| OutputFormat | 定义输出数据的写入方式 | getRecordWriter()、checkOutputSpecs() |
InputFormat 负责两件事:
| InputFormat | 说明 | K-V 类型 |
|---|---|---|
TextInputFormat | 默认格式,按行读取文本文件 | Key: 行偏移量,Value: 行内容 |
KeyValueTextInputFormat | 按分隔符拆分 Key-Value | Key: 分隔符前,Value: 分隔符后 |
NLineInputFormat | 每 N 行作为一个分片 | 同 TextInputFormat |
SequenceFileInputFormat | 读取 Hadoop 序列文件 | 序列文件中定义的 K-V 类型 |
CombineFileInputFormat | 合并小文件 | 自定义 |
javapackage com.example.mr.input;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
/**
* 自定义 InputFormat:只处理指定扩展名的文件
*/
public class CustomFileInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// 可以控制文件是否可分片
// 例如:压缩文件通常不可分片
return true;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new LineRecordReader();
}
}
splitSize = Math.max(minSize, Math.min(maxSize, blockSize)) 默认情况下: - minSize = 1 - maxSize = Long.MAX_VALUE - blockSize = 128MB (HDFS 默认块大小) 因此默认分片大小 = 128MB
配置参数:
java// 设置最小分片大小
conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456"); // 256MB
// 设置最大分片大小
conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912"); // 512MB
javapublic class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* 初始化方法,在 Map 任务开始前调用一次
*/
protected void setup(Context context) throws IOException, InterruptedException {
// 初始化资源、读取配置等
}
/**
* 核心映射方法,对每条输入记录调用一次
*/
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
// 处理逻辑
context.write(outputKey, outputValue);
}
/**
* 清理方法,在 Map 任务结束后调用一次
*/
protected void cleanup(Context context) throws IOException, InterruptedException {
// 释放资源、输出汇总信息等
}
/**
* 运行方法,控制整体流程
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
作用:
典型用途:
java@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// 读取自定义配置
this.separator = conf.get("field.separator", "\t");
// 读取分布式缓存
URI[] cacheFiles = context.getCacheFiles();
if (cacheFiles != null && cacheFiles.length > 0) {
loadCacheData(new Path(cacheFiles[0]));
}
}
作用:
参数说明:
key:输入键(如 TextInputFormat 时为行偏移量)value:输入值(如 TextInputFormat 时为行内容)context:上下文对象,用于输出结果和获取配置java@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
// 可以输出多个 K-V 对
for (String field : fields) {
context.write(new Text(field), new IntWritable(1));
}
// 也可以不输出(过滤数据)
// 或者输出一个(转换数据)
}
作用:
典型用途:
java@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 关闭资源
if (dbConnection != null) {
dbConnection.close();
}
// 输出最终汇总(如果在 map 中缓存了数据)
for (Map.Entry<String, Integer> entry : localCache.entrySet()) {
context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
}
}
javapackage com.example.mr.mapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* WordCount Mapper
* 输入:(行偏移量, 行内容)
* 输出:(单词, 1)
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 复用对象,减少 GC 压力
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 可以在这里读取停用词列表等
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString().toLowerCase();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, ONE);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 清理工作
}
}
Shuffle 是 MapReduce 中最复杂也最关键的阶段,负责将 Map 输出传输到 Reduce 端。
MAP 端 Shuffle REDUCE 端 Shuffle ┌────────────────────────────────────────────────┐ ┌────────────────────────────────────────────────┐ │ │ │ │ │ map() 输出 │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌────────────┐ │ │ ┌────────────┐ │ │ │ 环形缓冲区 │ 默认 100MB │ │ │ Fetch │ HTTP 方式拉取数据 │ │ │ (内存) │ mapreduce.task.io.sort.mb │ │ │ │ │ │ └─────┬──────┘ │ │ └─────┬──────┘ │ │ │ │ │ │ │ │ │ 达到阈值(80%)触发 Spill │ │ ▼ │ │ ▼ │ │ ┌────────────┐ │ │ ┌────────────┐ │ │ │ 内存缓冲 │ │ │ │ Partition │ 确定数据发往哪个 Reducer │ │ │ │ │ │ └─────┬──────┘ │ │ └─────┬──────┘ │ │ │ │ │ │ │ │ ▼ │ │ │ 达到阈值触发合并 │ │ ┌────────────┐ │ │ ▼ │ │ │ Sort │ 按 Key 排序 │ │ ┌────────────┐ │ │ └─────┬──────┘ │ │ │ Merge │ 多路归并排序 │ │ │ │ │ │ Sort │ │ │ ▼ │ │ └─────┬──────┘ │ │ ┌────────────┐ │ │ │ │ │ │ Combiner │ 本地聚合(可选) │ │ ▼ │ │ └─────┬──────┘ │ │ ┌────────────┐ │ │ │ │ │ │ Group │ 按 Key 分组 │ │ ▼ │ │ └─────┬──────┘ │ │ ┌────────────┐ │ │ │ │ │ │ Spill │ 溢写到磁盘 │ │ ▼ │ │ │ (磁盘) │ │ │ reduce() 输入 │ │ └─────┬──────┘ │ │ │ │ │ │ └────────────────────────────────────────────────┘ │ │ 多次 Spill 产生多个文件 │ │ ▼ │ │ ┌────────────┐ │ │ │ Merge │ 合并所有 Spill 文件 │ │ └─────┬──────┘ │ │ │ │ │ ▼ │ │ 最终输出文件 │ │ (按 Partition 分区,区内有序) │ │ │ └────────────────────────────────────────────────┘
作用:决定 Map 输出的 Key 应该发送到哪个 Reducer
默认实现:HashPartitioner
javapublic class HashPartitioner<K, V> extends Partitioner<K, V> {
@Override
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
javapackage com.example.mr.partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 自定义分区器:按首字母分区
*/
public class FirstLetterPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String keyStr = key.toString();
if (keyStr.isEmpty()) {
return 0;
}
char firstChar = Character.toLowerCase(keyStr.charAt(0));
if (firstChar >= 'a' && firstChar <= 'z') {
// a-z 分到 0-25 号分区
return (firstChar - 'a') % numReduceTasks;
} else {
// 其他字符分到最后一个分区
return numReduceTasks - 1;
}
}
}
配置使用:
javajob.setPartitionerClass(FirstLetterPartitioner.class);
job.setNumReduceTasks(26); // 设置 Reducer 数量
作用:对 Map 输出按 Key 进行排序
排序类型:
| 排序类型 | 说明 | 发生时机 |
|---|---|---|
| 快速排序 | 内存中对缓冲区数据排序 | Spill 之前 |
| 归并排序 | 合并多个有序的 Spill 文件 | Spill 文件合并时 |
javapackage com.example.mr.writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定义 Key 类型,支持二次排序
* 先按 first 升序,再按 second 降序
*/
public class CompositeKey implements WritableComparable<CompositeKey> {
private String first;
private int second;
public CompositeKey() {}
public CompositeKey(String first, int second) {
this.first = first;
this.second = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
@Override
public void readFields(DataInput in) throws IOException {
first = in.readUTF();
second = in.readInt();
}
@Override
public int compareTo(CompositeKey other) {
// 先按 first 升序
int cmp = this.first.compareTo(other.first);
if (cmp != 0) {
return cmp;
}
// 再按 second 降序
return Integer.compare(other.second, this.second);
}
@Override
public int hashCode() {
return first.hashCode() * 31 + second;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof CompositeKey) {
CompositeKey other = (CompositeKey) obj;
return first.equals(other.first) && second == other.second;
}
return false;
}
// Getters and Setters
public String getFirst() { return first; }
public int getSecond() { return second; }
}
作用:在 Map 端进行本地聚合,减少 Shuffle 数据量
特点:
适用场景:
java// 配置 Combiner(复用 Reducer)
job.setCombinerClass(WordCountReducer.class);
javapackage com.example.mr.combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* WordCount Combiner
* 在 Map 端本地聚合,减少网络传输
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
xml<!-- Map 端 -->
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>100</value>
<description>环形缓冲区大小(MB)</description>
</property>
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
<description>缓冲区溢写阈值</description>
</property>
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
<description>同时合并的文件数</description>
</property>
<!-- Reduce 端 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
<description>并行拉取数据的线程数</description>
</property>
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
<description>Shuffle 缓冲区占堆内存比例</description>
</property>
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.66</value>
<description>触发合并的缓冲区阈值</description>
</property>
javapublic class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* 初始化方法
*/
protected void setup(Context context) throws IOException, InterruptedException {
}
/**
* 核心归约方法,对每个 Key 的所有 Value 调用一次
*/
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
}
/**
* 清理方法
*/
protected void cleanup(Context context) throws IOException, InterruptedException {
}
/**
* 运行方法
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}
}
作用:与 Mapper 的 setup 类似,用于初始化
java@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// 读取配置、初始化资源等
}
作用:
重要注意事项:
values 只能遍历一次!java@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// ❌ 错误:试图多次遍历
// for (IntWritable val : values) { ... }
// for (IntWritable val : values) { ... } // 第二次遍历得到空结果
// ✅ 正确:如需多次使用,先存储
List<Integer> valueList = new ArrayList<>();
for (IntWritable val : values) {
valueList.add(val.get());
}
// 现在可以多次使用 valueList
}
作用:清理资源,与 Mapper 的 cleanup 类似
javapackage com.example.mr.reducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* WordCount Reducer
* 输入:(单词, [1, 1, 1, ...])
* 输出:(单词, 总次数)
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 初始化
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 清理
}
}
javapackage com.example.mr.reducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
/**
* TopN Reducer:找出词频最高的 N 个单词
*/
public class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private int n;
private TreeMap<Integer, String> topN;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
n = context.getConfiguration().getInt("topn", 10);
topN = new TreeMap<>();
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 维护 TopN
topN.put(sum, key.toString());
if (topN.size() > n) {
topN.remove(topN.firstKey()); // 移除最小的
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 输出 TopN 结果(按词频降序)
for (Map.Entry<Integer, String> entry : topN.descendingMap().entrySet()) {
context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
}
}
}
| OutputFormat | 说明 |
|---|---|
TextOutputFormat | 默认格式,输出文本文件,K-V 用 Tab 分隔 |
SequenceFileOutputFormat | 输出 Hadoop 序列文件 |
MultipleOutputs | 输出多个文件/目录 |
LazyOutputFormat | 惰性输出,无数据时不创建文件 |
NullOutputFormat | 不输出任何内容 |
javapackage com.example.mr.output;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 自定义 OutputFormat:JSON 格式输出
*/
public class JsonOutputFormat extends FileOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Path outputPath = getDefaultWorkFile(context, ".json");
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
FSDataOutputStream out = fs.create(outputPath, false);
return new JsonRecordWriter(out);
}
static class JsonRecordWriter extends RecordWriter<Text, IntWritable> {
private FSDataOutputStream out;
private boolean firstRecord = true;
public JsonRecordWriter(FSDataOutputStream out) throws IOException {
this.out = out;
out.writeBytes("[\n");
}
@Override
public void write(Text key, IntWritable value) throws IOException {
if (!firstRecord) {
out.writeBytes(",\n");
}
firstRecord = false;
String json = String.format(" {\"word\": \"%s\", \"count\": %d}",
key.toString(), value.get());
out.writeBytes(json);
}
@Override
public void close(TaskAttemptContext context) throws IOException {
out.writeBytes("\n]");
out.close();
}
}
}
java// 在 Reducer 中使用 MultipleOutputs
public class MultiOutputReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs<Text, IntWritable> multipleOutputs;
@Override
protected void setup(Context context) {
multipleOutputs = new MultipleOutputs<>(context);
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 根据条件输出到不同文件
if (sum > 100) {
multipleOutputs.write("high", key, new IntWritable(sum));
} else {
multipleOutputs.write("low", key, new IntWritable(sum));
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}
// Driver 配置
MultipleOutputs.addNamedOutput(job, "high", TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "low", TextOutputFormat.class, Text.class, IntWritable.class);
javapackage com.example.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* WordCount 完整示例
*/
public class WordCount extends Configured implements Tool {
/**
* Mapper 类
*/
public static class TokenizerMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase().replaceAll("[^a-z]", ""));
if (word.getLength() > 0) {
context.write(word, ONE);
}
}
}
}
/**
* Reducer 类
*/
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
/**
* Driver
*/
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCount <input> <output>");
return -1;
}
Configuration conf = getConf();
Job job = Job.getInstance(conf, "Word Count");
// 设置 Jar
job.setJarByClass(WordCount.class);
// 设置 Mapper
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置 Combiner(可选)
job.setCombinerClass(IntSumReducer.class);
// 设置 Reducer
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置 Reducer 数量
job.setNumReduceTasks(3);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任务
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(exitCode);
}
}
bash# 编译打包
mvn clean package
# 运行
hadoop jar wordcount.jar com.example.mr.WordCount /input /output
# 查看结果
hdfs dfs -cat /output/part-r-*
作用:将小文件分发到所有节点,供 Map/Reduce 任务使用
java// Driver 中添加缓存文件
job.addCacheFile(new URI("/user/data/lookup.txt#lookup"));
// Mapper 中使用
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 方式1:通过符号链接访问
BufferedReader reader = new BufferedReader(new FileReader("lookup"));
// 方式2:通过 API 获取
URI[] cacheFiles = context.getCacheFiles();
Path cachePath = new Path(cacheFiles[0]);
FileSystem fs = FileSystem.get(context.getConfiguration());
BufferedReader reader = new BufferedReader(
new InputStreamReader(fs.open(cachePath)));
// 读取并缓存数据
String line;
while ((line = reader.readLine()) != null) {
// 处理数据
}
reader.close();
}
java// 定义计数器枚举
enum WordCountCounter {
TOTAL_WORDS,
INVALID_WORDS
}
// 在 Mapper 中使用
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
for (String word : words) {
if (isValid(word)) {
context.getCounter(WordCountCounter.TOTAL_WORDS).increment(1);
context.write(new Text(word), new IntWritable(1));
} else {
context.getCounter(WordCountCounter.INVALID_WORDS).increment(1);
}
}
}
// 在 Driver 中获取计数器结果
Counters counters = job.getCounters();
long totalWords = counters.findCounter(WordCountCounter.TOTAL_WORDS).getValue();
System.out.println("Total words: " + totalWords);
java// Job 链:Job1 的输出作为 Job2 的输入
Job job1 = Job.getInstance(conf, "Job1");
// ... 配置 job1 ...
job1.waitForCompletion(true);
Job job2 = Job.getInstance(conf, "Job2");
FileInputFormat.addInputPath(job2, new Path("/job1/output"));
// ... 配置 job2 ...
job2.waitForCompletion(true);
| 优化点 | 方法 |
|---|---|
| 增加并行度 | 减小分片大小,增加 Mapper 数量 |
| 减少溢写次数 | 增大 io.sort.mb,提高 sort.spill.percent |
| 使用 Combiner | 本地聚合减少数据量 |
| 压缩 Map 输出 | 设置 mapreduce.map.output.compress=true |
| 优化点 | 方法 |
|---|---|
| 合理设置 Reducer 数量 | 通常设为集群 Reduce Slot 数量的 0.95 或 1.75 倍 |
| 增加并行拷贝 | 提高 mapreduce.reduce.shuffle.parallelcopies |
| 增大缓冲区 | 提高 mapreduce.reduce.shuffle.input.buffer.percent |
| 场景 | 解决方案 |
|---|---|
| Key 分布不均 | 加盐打散 Key,二次聚合 |
| 热点 Key | 抽样识别热点 Key,单独处理 |
| Join 倾斜 | Map Side Join(小表广播) |
Input → Split → Map → Partition → Sort → Combiner → Shuffle → Merge → Reduce → Output
| 阶段 | 核心组件 | 核心方法 | 作用 |
|---|---|---|---|
| 输入 | InputFormat | getSplits() | 数据分片 |
| 读取 | RecordReader | nextKeyValue() | 解析 K-V |
| 映射 | Mapper | map() | 数据转换 |
| 分区 | Partitioner | getPartition() | 决定目标 Reducer |
| 排序 | WritableComparable | compareTo() | Key 排序 |
| 合并 | Combiner | reduce() | 本地聚合 |
| 归约 | Reducer | reduce() | 全局聚合 |
| 输出 | OutputFormat | getRecordWriter() | 写入结果 |
掌握 MapReduce 的完整流程和各组件的作用,是理解大数据处理的基础,也是优化数据处理性能的关键。
本文作者:ender
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!