编辑
2025-12-10
intView
0
请注意,本文编写于 154 天前,最后修改于 149 天前,其中某些信息可能已经过时。

目录

MapReduce 执行流程完全指南
一、总览
1. MapReduce 核心概念
2. 完整执行流程图
3. 核心组件概览
二、InputFormat 与数据分片
2.1 InputFormat 的作用
2.2 常用 InputFormat 类型
2.3 自定义 InputFormat 示例
2.4 分片大小计算
三、Map 阶段详解
3.1 Mapper 类结构
3.2 核心方法详解
setup(Context context)
map(KEYIN key, VALUEIN value, Context context)
cleanup(Context context)
3.3 完整 Mapper 示例:WordCount
四、Shuffle 阶段详解
4.1 Shuffle 流程图解
4.2 Partitioner(分区器)
自定义 Partitioner 示例
4.3 排序(Sort)
自定义排序:实现 WritableComparable
4.4 Combiner(合并器)
Combiner 示例
4.5 Shuffle 性能调优参数
五、Reduce 阶段详解
5.1 Reducer 类结构
5.2 核心方法详解
setup(Context context)
reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
cleanup(Context context)
5.3 完整 Reducer 示例:WordCount
5.4 高级 Reducer 示例:TopN
六、OutputFormat 与数据输出
6.1 常用 OutputFormat 类型
6.2 自定义 OutputFormat 示例
6.3 MultipleOutputs 使用示例
七、完整 MapReduce 程序示例
7.1 WordCount 完整代码
7.2 运行命令
八、高级特性
8.1 分布式缓存
8.2 计数器(Counter)
8.3 链式 MapReduce
九、性能优化总结
9.1 Map 端优化
9.2 Reduce 端优化
9.3 数据倾斜处理
十、总结
MapReduce 执行流程速记
核心组件对照表

MapReduce 执行流程完全指南

MapReduce 是 Hadoop 生态系统中的核心计算框架,采用"分而治之"的思想,将大规模数据处理任务分解为可并行执行的 Map 和 Reduce 两个阶段。本文将详细介绍 MapReduce 的完整执行流程。


一、总览

1. MapReduce 核心概念

MapReduce 将计算过程抽象为两个核心函数:

阶段说明输入输出
Map映射阶段,负责数据的分片处理和初步转换(K1, V1)list(K2, V2)
Reduce归约阶段,负责数据的汇总聚合(K2, list(V2))list(K3, V3)

2. 完整执行流程图

┌─────────────────────────────────────────────────────────────────────────────────┐ │ MapReduce 完整执行流程 │ └─────────────────────────────────────────────────────────────────────────────────┘ ┌──────────┐ │ 输入文件 │ HDFS 上的数据文件 └────┬─────┘ │ ▼ ┌──────────┐ │ InputFormat │ 决定如何分片和读取数据 └────┬─────┘ │ ▼ ┌──────────┐ │ Split │ 逻辑分片(默认与 Block 大小一致,128MB) └────┬─────┘ │ ▼ ┌──────────┐ │RecordReader│ 将分片数据解析为 Key-Value 对 └────┬─────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────┐ │ MAP 阶段 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Mapper │ │ Mapper │ │ Mapper │ │ Mapper │ ... │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │Partitioner│ │Partitioner│ │Partitioner│ │Partitioner│ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Sort │ │ Sort │ │ Sort │ │ Sort │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Combiner │ │ Combiner │ │ Combiner │ │ Combiner │ (可选) │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Map 输出写入本地磁盘 │ │ │ └──────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────┐ │ SHUFFLE 阶段 │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ 数据通过网络从 Map 端拷贝到 Reduce 端 │ │ │ │ (按 Partition 分配到对应 Reducer) │ │ │ └──────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────┐ │ REDUCE 阶段 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Merge │ │ Merge │ │ Merge │ 合并来自多个 Mapper 的数据 │ │ │ & Sort │ │ & Sort │ │ & Sort │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Reducer │ │ Reducer │ │ Reducer │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ OutputFormat │ │ │ └──────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────┐ │ 输出文件 │ 写入 HDFS └──────────┘

3. 核心组件概览

组件作用核心方法/说明
InputFormat定义数据读取方式和分片策略getSplits()createRecordReader()
RecordReader将分片数据解析为 K-V 对nextKeyValue()getCurrentKey()getCurrentValue()
Mapper处理输入的每条记录map()
Partitioner决定 Map 输出发送到哪个 ReducergetPartition()
CombinerMap 端本地聚合(可选)复用 Reducer 逻辑
Reducer聚合处理相同 Key 的所有 Valuereduce()
OutputFormat定义输出数据的写入方式getRecordWriter()checkOutputSpecs()

二、InputFormat 与数据分片

2.1 InputFormat 的作用

InputFormat 负责两件事:

  1. 数据分片:将输入数据切分为多个 InputSplit
  2. 创建 RecordReader:提供读取分片数据的方式

2.2 常用 InputFormat 类型

InputFormat说明K-V 类型
TextInputFormat默认格式,按行读取文本文件Key: 行偏移量,Value: 行内容
KeyValueTextInputFormat按分隔符拆分 Key-ValueKey: 分隔符前,Value: 分隔符后
NLineInputFormat每 N 行作为一个分片同 TextInputFormat
SequenceFileInputFormat读取 Hadoop 序列文件序列文件中定义的 K-V 类型
CombineFileInputFormat合并小文件自定义

2.3 自定义 InputFormat 示例

java
package com.example.mr.input; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; /** * 自定义 InputFormat:只处理指定扩展名的文件 */ public class CustomFileInputFormat extends FileInputFormat<LongWritable, Text> { @Override protected boolean isSplitable(JobContext context, Path filename) { // 可以控制文件是否可分片 // 例如:压缩文件通常不可分片 return true; } @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) { return new LineRecordReader(); } }

2.4 分片大小计算

splitSize = Math.max(minSize, Math.min(maxSize, blockSize)) 默认情况下: - minSize = 1 - maxSize = Long.MAX_VALUE - blockSize = 128MB (HDFS 默认块大小) 因此默认分片大小 = 128MB

配置参数

java
// 设置最小分片大小 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456"); // 256MB // 设置最大分片大小 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912"); // 512MB

三、Map 阶段详解

3.1 Mapper 类结构

java
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { /** * 初始化方法,在 Map 任务开始前调用一次 */ protected void setup(Context context) throws IOException, InterruptedException { // 初始化资源、读取配置等 } /** * 核心映射方法,对每条输入记录调用一次 */ protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { // 处理逻辑 context.write(outputKey, outputValue); } /** * 清理方法,在 Map 任务结束后调用一次 */ protected void cleanup(Context context) throws IOException, InterruptedException { // 释放资源、输出汇总信息等 } /** * 运行方法,控制整体流程 */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } }

3.2 核心方法详解

setup(Context context)

作用

  • 在 Mapper 处理数据之前调用一次
  • 用于初始化操作

典型用途

  • 读取配置参数
  • 初始化数据库连接
  • 加载缓存数据(如小表 Join)
  • 获取分布式缓存文件
java
@Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); // 读取自定义配置 this.separator = conf.get("field.separator", "\t"); // 读取分布式缓存 URI[] cacheFiles = context.getCacheFiles(); if (cacheFiles != null && cacheFiles.length > 0) { loadCacheData(new Path(cacheFiles[0])); } }

map(KEYIN key, VALUEIN value, Context context)

作用

  • 处理每一条输入记录
  • 输出零个、一个或多个 Key-Value 对

参数说明

  • key:输入键(如 TextInputFormat 时为行偏移量)
  • value:输入值(如 TextInputFormat 时为行内容)
  • context:上下文对象,用于输出结果和获取配置
java
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); // 可以输出多个 K-V 对 for (String field : fields) { context.write(new Text(field), new IntWritable(1)); } // 也可以不输出(过滤数据) // 或者输出一个(转换数据) }

cleanup(Context context)

作用

  • 在 Mapper 处理完所有数据后调用一次
  • 用于清理操作

典型用途

  • 关闭数据库连接
  • 输出汇总统计
  • 释放资源
java
@Override protected void cleanup(Context context) throws IOException, InterruptedException { // 关闭资源 if (dbConnection != null) { dbConnection.close(); } // 输出最终汇总(如果在 map 中缓存了数据) for (Map.Entry<String, Integer> entry : localCache.entrySet()) { context.write(new Text(entry.getKey()), new IntWritable(entry.getValue())); } }

3.3 完整 Mapper 示例:WordCount

java
package com.example.mr.mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; /** * WordCount Mapper * 输入:(行偏移量, 行内容) * 输出:(单词, 1) */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // 复用对象,减少 GC 压力 private final static IntWritable ONE = new IntWritable(1); private Text word = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 可以在这里读取停用词列表等 } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().toLowerCase(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, ONE); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // 清理工作 } }

四、Shuffle 阶段详解

Shuffle 是 MapReduce 中最复杂也最关键的阶段,负责将 Map 输出传输到 Reduce 端。

4.1 Shuffle 流程图解

MAP 端 Shuffle REDUCE 端 Shuffle ┌────────────────────────────────────────────────┐ ┌────────────────────────────────────────────────┐ │ │ │ │ │ map() 输出 │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌────────────┐ │ │ ┌────────────┐ │ │ │ 环形缓冲区 │ 默认 100MB │ │ │ Fetch │ HTTP 方式拉取数据 │ │ │ (内存) │ mapreduce.task.io.sort.mb │ │ │ │ │ │ └─────┬──────┘ │ │ └─────┬──────┘ │ │ │ │ │ │ │ │ │ 达到阈值(80%)触发 Spill │ │ ▼ │ │ ▼ │ │ ┌────────────┐ │ │ ┌────────────┐ │ │ │ 内存缓冲 │ │ │ │ Partition │ 确定数据发往哪个 Reducer │ │ │ │ │ │ └─────┬──────┘ │ │ └─────┬──────┘ │ │ │ │ │ │ │ │ ▼ │ │ │ 达到阈值触发合并 │ │ ┌────────────┐ │ │ ▼ │ │ │ Sort │ 按 Key 排序 │ │ ┌────────────┐ │ │ └─────┬──────┘ │ │ │ Merge │ 多路归并排序 │ │ │ │ │ │ Sort │ │ │ ▼ │ │ └─────┬──────┘ │ │ ┌────────────┐ │ │ │ │ │ │ Combiner │ 本地聚合(可选) │ │ ▼ │ │ └─────┬──────┘ │ │ ┌────────────┐ │ │ │ │ │ │ Group │ 按 Key 分组 │ │ ▼ │ │ └─────┬──────┘ │ │ ┌────────────┐ │ │ │ │ │ │ Spill │ 溢写到磁盘 │ │ ▼ │ │ │ (磁盘) │ │ │ reduce() 输入 │ │ └─────┬──────┘ │ │ │ │ │ │ └────────────────────────────────────────────────┘ │ │ 多次 Spill 产生多个文件 │ │ ▼ │ │ ┌────────────┐ │ │ │ Merge │ 合并所有 Spill 文件 │ │ └─────┬──────┘ │ │ │ │ │ ▼ │ │ 最终输出文件 │ │ (按 Partition 分区,区内有序) │ │ │ └────────────────────────────────────────────────┘

4.2 Partitioner(分区器)

作用:决定 Map 输出的 Key 应该发送到哪个 Reducer

默认实现HashPartitioner

java
public class HashPartitioner<K, V> extends Partitioner<K, V> { @Override public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }

自定义 Partitioner 示例

java
package com.example.mr.partitioner; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * 自定义分区器:按首字母分区 */ public class FirstLetterPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numReduceTasks) { String keyStr = key.toString(); if (keyStr.isEmpty()) { return 0; } char firstChar = Character.toLowerCase(keyStr.charAt(0)); if (firstChar >= 'a' && firstChar <= 'z') { // a-z 分到 0-25 号分区 return (firstChar - 'a') % numReduceTasks; } else { // 其他字符分到最后一个分区 return numReduceTasks - 1; } } }

配置使用

java
job.setPartitionerClass(FirstLetterPartitioner.class); job.setNumReduceTasks(26); // 设置 Reducer 数量

4.3 排序(Sort)

作用:对 Map 输出按 Key 进行排序

排序类型

排序类型说明发生时机
快速排序内存中对缓冲区数据排序Spill 之前
归并排序合并多个有序的 Spill 文件Spill 文件合并时

自定义排序:实现 WritableComparable

java
package com.example.mr.writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 自定义 Key 类型,支持二次排序 * 先按 first 升序,再按 second 降序 */ public class CompositeKey implements WritableComparable<CompositeKey> { private String first; private int second; public CompositeKey() {} public CompositeKey(String first, int second) { this.first = first; this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeInt(second); } @Override public void readFields(DataInput in) throws IOException { first = in.readUTF(); second = in.readInt(); } @Override public int compareTo(CompositeKey other) { // 先按 first 升序 int cmp = this.first.compareTo(other.first); if (cmp != 0) { return cmp; } // 再按 second 降序 return Integer.compare(other.second, this.second); } @Override public int hashCode() { return first.hashCode() * 31 + second; } @Override public boolean equals(Object obj) { if (obj instanceof CompositeKey) { CompositeKey other = (CompositeKey) obj; return first.equals(other.first) && second == other.second; } return false; } // Getters and Setters public String getFirst() { return first; } public int getSecond() { return second; } }

4.4 Combiner(合并器)

作用:在 Map 端进行本地聚合,减少 Shuffle 数据量

特点

  • 本质上是一个特殊的 Reducer
  • 在 Map 端运行
  • 不是所有场景都适用(需要满足结合律和交换律)

适用场景

  • ✅ 求和、计数、最大值、最小值
  • ❌ 求平均值(除非改变数据结构)
java
// 配置 Combiner(复用 Reducer) job.setCombinerClass(WordCountReducer.class);

Combiner 示例

java
package com.example.mr.combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * WordCount Combiner * 在 Map 端本地聚合,减少网络传输 */ public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

4.5 Shuffle 性能调优参数

xml
<!-- Map 端 --> <property> <name>mapreduce.task.io.sort.mb</name> <value>100</value> <description>环形缓冲区大小(MB)</description> </property> <property> <name>mapreduce.map.sort.spill.percent</name> <value>0.80</value> <description>缓冲区溢写阈值</description> </property> <property> <name>mapreduce.task.io.sort.factor</name> <value>10</value> <description>同时合并的文件数</description> </property> <!-- Reduce 端 --> <property> <name>mapreduce.reduce.shuffle.parallelcopies</name> <value>5</value> <description>并行拉取数据的线程数</description> </property> <property> <name>mapreduce.reduce.shuffle.input.buffer.percent</name> <value>0.70</value> <description>Shuffle 缓冲区占堆内存比例</description> </property> <property> <name>mapreduce.reduce.shuffle.merge.percent</name> <value>0.66</value> <description>触发合并的缓冲区阈值</description> </property>

五、Reduce 阶段详解

5.1 Reducer 类结构

java
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { /** * 初始化方法 */ protected void setup(Context context) throws IOException, InterruptedException { } /** * 核心归约方法,对每个 Key 的所有 Value 调用一次 */ protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException { } /** * 清理方法 */ protected void cleanup(Context context) throws IOException, InterruptedException { } /** * 运行方法 */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); } cleanup(context); } }

5.2 核心方法详解

setup(Context context)

作用:与 Mapper 的 setup 类似,用于初始化

java
@Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); // 读取配置、初始化资源等 }

reduce(KEYIN key, Iterable values, Context context)

作用

  • 处理具有相同 Key 的所有 Value
  • 对每个唯一的 Key 调用一次

重要注意事项

  • values 只能遍历一次!
  • 如需多次遍历,需先将数据存储到集合中
java
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // ❌ 错误:试图多次遍历 // for (IntWritable val : values) { ... } // for (IntWritable val : values) { ... } // 第二次遍历得到空结果 // ✅ 正确:如需多次使用,先存储 List<Integer> valueList = new ArrayList<>(); for (IntWritable val : values) { valueList.add(val.get()); } // 现在可以多次使用 valueList }

cleanup(Context context)

作用:清理资源,与 Mapper 的 cleanup 类似

5.3 完整 Reducer 示例:WordCount

java
package com.example.mr.reducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * WordCount Reducer * 输入:(单词, [1, 1, 1, ...]) * 输出:(单词, 总次数) */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 初始化 } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // 清理 } }

5.4 高级 Reducer 示例:TopN

java
package com.example.mr.reducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Map; import java.util.TreeMap; /** * TopN Reducer:找出词频最高的 N 个单词 */ public class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private int n; private TreeMap<Integer, String> topN; @Override protected void setup(Context context) throws IOException, InterruptedException { n = context.getConfiguration().getInt("topn", 10); topN = new TreeMap<>(); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } // 维护 TopN topN.put(sum, key.toString()); if (topN.size() > n) { topN.remove(topN.firstKey()); // 移除最小的 } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // 输出 TopN 结果(按词频降序) for (Map.Entry<Integer, String> entry : topN.descendingMap().entrySet()) { context.write(new Text(entry.getValue()), new IntWritable(entry.getKey())); } } }

六、OutputFormat 与数据输出

6.1 常用 OutputFormat 类型

OutputFormat说明
TextOutputFormat默认格式,输出文本文件,K-V 用 Tab 分隔
SequenceFileOutputFormat输出 Hadoop 序列文件
MultipleOutputs输出多个文件/目录
LazyOutputFormat惰性输出,无数据时不创建文件
NullOutputFormat不输出任何内容

6.2 自定义 OutputFormat 示例

java
package com.example.mr.output; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 自定义 OutputFormat:JSON 格式输出 */ public class JsonOutputFormat extends FileOutputFormat<Text, IntWritable> { @Override public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Path outputPath = getDefaultWorkFile(context, ".json"); FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); FSDataOutputStream out = fs.create(outputPath, false); return new JsonRecordWriter(out); } static class JsonRecordWriter extends RecordWriter<Text, IntWritable> { private FSDataOutputStream out; private boolean firstRecord = true; public JsonRecordWriter(FSDataOutputStream out) throws IOException { this.out = out; out.writeBytes("[\n"); } @Override public void write(Text key, IntWritable value) throws IOException { if (!firstRecord) { out.writeBytes(",\n"); } firstRecord = false; String json = String.format(" {\"word\": \"%s\", \"count\": %d}", key.toString(), value.get()); out.writeBytes(json); } @Override public void close(TaskAttemptContext context) throws IOException { out.writeBytes("\n]"); out.close(); } } }

6.3 MultipleOutputs 使用示例

java
// 在 Reducer 中使用 MultipleOutputs public class MultiOutputReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private MultipleOutputs<Text, IntWritable> multipleOutputs; @Override protected void setup(Context context) { multipleOutputs = new MultipleOutputs<>(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } // 根据条件输出到不同文件 if (sum > 100) { multipleOutputs.write("high", key, new IntWritable(sum)); } else { multipleOutputs.write("low", key, new IntWritable(sum)); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } } // Driver 配置 MultipleOutputs.addNamedOutput(job, "high", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "low", TextOutputFormat.class, Text.class, IntWritable.class);

七、完整 MapReduce 程序示例

7.1 WordCount 完整代码

java
package com.example.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.StringTokenizer; /** * WordCount 完整示例 */ public class WordCount extends Configured implements Tool { /** * Mapper 类 */ public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable ONE = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase().replaceAll("[^a-z]", "")); if (word.getLength() > 0) { context.write(word, ONE); } } } } /** * Reducer 类 */ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } /** * Driver */ @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: WordCount <input> <output>"); return -1; } Configuration conf = getConf(); Job job = Job.getInstance(conf, "Word Count"); // 设置 Jar job.setJarByClass(WordCount.class); // 设置 Mapper job.setMapperClass(TokenizerMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置 Combiner(可选) job.setCombinerClass(IntSumReducer.class); // 设置 Reducer job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置 Reducer 数量 job.setNumReduceTasks(3); // 设置输入输出路径 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任务 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(exitCode); } }

7.2 运行命令

bash
# 编译打包 mvn clean package # 运行 hadoop jar wordcount.jar com.example.mr.WordCount /input /output # 查看结果 hdfs dfs -cat /output/part-r-*

八、高级特性

8.1 分布式缓存

作用:将小文件分发到所有节点,供 Map/Reduce 任务使用

java
// Driver 中添加缓存文件 job.addCacheFile(new URI("/user/data/lookup.txt#lookup")); // Mapper 中使用 @Override protected void setup(Context context) throws IOException, InterruptedException { // 方式1:通过符号链接访问 BufferedReader reader = new BufferedReader(new FileReader("lookup")); // 方式2:通过 API 获取 URI[] cacheFiles = context.getCacheFiles(); Path cachePath = new Path(cacheFiles[0]); FileSystem fs = FileSystem.get(context.getConfiguration()); BufferedReader reader = new BufferedReader( new InputStreamReader(fs.open(cachePath))); // 读取并缓存数据 String line; while ((line = reader.readLine()) != null) { // 处理数据 } reader.close(); }

8.2 计数器(Counter)

java
// 定义计数器枚举 enum WordCountCounter { TOTAL_WORDS, INVALID_WORDS } // 在 Mapper 中使用 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\\s+"); for (String word : words) { if (isValid(word)) { context.getCounter(WordCountCounter.TOTAL_WORDS).increment(1); context.write(new Text(word), new IntWritable(1)); } else { context.getCounter(WordCountCounter.INVALID_WORDS).increment(1); } } } // 在 Driver 中获取计数器结果 Counters counters = job.getCounters(); long totalWords = counters.findCounter(WordCountCounter.TOTAL_WORDS).getValue(); System.out.println("Total words: " + totalWords);

8.3 链式 MapReduce

java
// Job 链:Job1 的输出作为 Job2 的输入 Job job1 = Job.getInstance(conf, "Job1"); // ... 配置 job1 ... job1.waitForCompletion(true); Job job2 = Job.getInstance(conf, "Job2"); FileInputFormat.addInputPath(job2, new Path("/job1/output")); // ... 配置 job2 ... job2.waitForCompletion(true);

九、性能优化总结

9.1 Map 端优化

优化点方法
增加并行度减小分片大小,增加 Mapper 数量
减少溢写次数增大 io.sort.mb,提高 sort.spill.percent
使用 Combiner本地聚合减少数据量
压缩 Map 输出设置 mapreduce.map.output.compress=true

9.2 Reduce 端优化

优化点方法
合理设置 Reducer 数量通常设为集群 Reduce Slot 数量的 0.95 或 1.75 倍
增加并行拷贝提高 mapreduce.reduce.shuffle.parallelcopies
增大缓冲区提高 mapreduce.reduce.shuffle.input.buffer.percent

9.3 数据倾斜处理

场景解决方案
Key 分布不均加盐打散 Key,二次聚合
热点 Key抽样识别热点 Key,单独处理
Join 倾斜Map Side Join(小表广播)

十、总结

MapReduce 执行流程速记

Input → Split → Map → Partition → Sort → Combiner → Shuffle → Merge → Reduce → Output

核心组件对照表

阶段核心组件核心方法作用
输入InputFormatgetSplits()数据分片
读取RecordReadernextKeyValue()解析 K-V
映射Mappermap()数据转换
分区PartitionergetPartition()决定目标 Reducer
排序WritableComparablecompareTo()Key 排序
合并Combinerreduce()本地聚合
归约Reducerreduce()全局聚合
输出OutputFormatgetRecordWriter()写入结果

掌握 MapReduce 的完整流程和各组件的作用,是理解大数据处理的基础,也是优化数据处理性能的关键。

本文作者:ender

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!