MapReduce 概述
MapReduce 是一种编程模型(没有集群的概念,会把任务提交到 yarn 集群上跑),用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。
当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。(MapReduce在企业里几乎不再使用了,稍微了解即可)。
如果没有MapReduce
- 那么在分布式计算上面将很难办,不好编程。
- 在早期无法处理大数据的离线计算。
- 编程中不易扩展性
- 分布式计算任务一旦挂了,没有容错机制进行处理
说明:MapReduce不擅长的方面(慢!)
- 实时计算:像MySQL一样,在毫秒级或者秒级内返回结果。
- 流式计算:MapReduce的输入数据集是静态的,不能动态变化。
- DAG计算:多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出
现在MapReduce逐渐被Spark,Flink等框架取代。但是思想很重要,值得学习。更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
MapReduce 原理
MapReduce 的集群管理架构
- 1.客户端发送MR任务到RM上
- 2.RM分配资源,找到对应的NM,分配Container容器,启动对应的Application Master
- 3.Application Master向Applications Manager注册
- 4.Application Master向Resource Scheduler申请资源
- 5.找到对应的NM
- 6.分配Container容器,启动对应的的Map Task或者是Reduce Task任务
- 7.Map Task和Reduce Task对Application Master汇报心跳,任务进度
- 8.Application Master向Applications Manager汇报整体任务进度,如果执行完了Applications Manager会将Application Master移除
注意:原则上MapReduce分为两个阶段:Map Task和Reduce Task,但是由于Shuffling阶段很重要,人为划分了Shuffling阶段,这个阶段发生在Map Task和Reduce Task之间,可以理解为由Map Task后半段和Reduce Task前半段组成。
MapReduce 的数据流
MapTask
并行度决定机制
- 1G的数据,分成8份并行计算,那么每一份需要计算的数据为128M,感觉还不错。
- 1M的数据,分成8份并行计算,那么每一份需要计算的数据为128B,感觉资源浪费严重。
那么就需要有一个东西来决定怎么切分,它就是InputFormat,而切分大小一般由HDFS块大小决定。
- 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定。
- 每一个Split切片分配一个MapTask并行处理实例。
- 默认情况下,切片大小=BlockSize(128M)。
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片。
针对第四点说明:比如有3个文件,一个300M,第二个50M,第三个50M,那么一共就是切了5个MapTask出来。
针对每一个文件,第一个300M切了3个,第二个50M切了一个,第三个50M切了一个,共5个。
而如果只有一个文件为128M+1KB,那么就只会切分一个,因为切片判断规则为->如果文件小于切片大小1.1倍,就和上一个切片将就放在一起了,这样可以防止过小的切片在执行任务的时候,调度资源的时间超过执行时间的情况。
MapReduce 运行流程
作业的运行过程主要包括如下几个步骤:
- 1、作业的提交
- 2、作业的初始化
- 3、作业任务的分配
- 4、作业任务的执行
- 5、作业执行状态更新
- 6、作业完成
具体作业执行过程的流程图如下图所示:
作业的提交
在MR的代码中调用waitForCompletion()
方法,里面封装了Job.submit()
方法,而Job.submit()
方法里面会创建一个JobSubmmiter对象。当我们在waitForCompletion(true)
时,则waitForCompletion
方法会每秒轮询作业的执行进度,如果发现与上次查询到的状态有差别,则将详情打印到控制台。如果作业执行成功,就显示作业计数器,否则将导致作业失败的记录输出到控制台。
其中JobSubmmiter实现的大概过程如下:
- 1.向资源管理器resourcemanager提交申请,用于一个mapreduce作业ID,如图步骤2所示
- 2.检查作业的输出配置,判断目录是否已经存在等信息
- 3.计算作业的输入分片的大小
- 4.将运行作业的jar,配置文件,输入分片的计算资源复制到一个以作业ID命名的hdfs临时目录下,作业jar的复本比较多,默认为10个(通过参数mapreduce.client.submit.file.replication控制),
- 5.通过资源管理器的submitApplication方法提交作业
作业的初始化
1.当资源管理器通过方法submitApplication方法被调用后,便将请求传给了yarn的调度器,然后调度器在一个节点管理器上分配一个容器(container0)用来启动application master(主类是MRAppMaster)进程。该进程一旦启动就会向resourcemanager注册并报告自己的信息,application master并且可以监控map和reduce的运行状态。因此application master对作业的初始化是通过创建多个薄记对象以保持对作业进度的跟踪。
2.application master接收作业提交时的hdfs临时共享目录中的资源文件,jar,分片信息,配置信息等。并对每一个分片创建一个map对象,以及通过mapreduce.job.reduces参数(作业通过setNumReduceTasks()方法设定)确定reduce的数量。
3.application master会判断是否使用uber(作业与application master在同一个jvm运行,也就是maptask和reducetask运行在同一个节点上)模式运行作业,uber模式运行条件:map数量小于10个,1个reduce,且输入数据小于一个hdfs块。
可以通过参数:
mapreduce.job.ubertask.enable #是否启用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map数
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce数
mapreduce.job.ubertask.maxbytes #ubertask最大作业大小
4.application master调用setupJob方法设置OutputCommiter,FileOutputCommiter为默认值,表示建立做的最终输出目录和任务输出的临时工作空间。
更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
作业任务的分配
1.在application master判断作业不符合uber模式的情况下,那么application master则会向资源管理器为map和reduce任务申请资源容器。
2.首先就是为map任务发出资源申请请求,直到有5%的map任务完成时,才会为reduce任务所需资源申请发出请求。
3.在任务的分配过程中,reduce任务可以在任何的datanode节点运行,但是map任务执行的时候需要考虑到数据本地化的机制,在给任务指定资源的时候每个map和reduce默认为1G内存,可以通过如下参数配置:
mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores
作业任务的执行
application master提交申请后,资源管理器为其按需分配资源,这时,application master就与节点管理器通信来启动容器。该任务由主类YarnChild的一个java应用程序执行。在运行任务之前,首先将所需的资源进行本地化,包括作业的配置,jar文件等。接下来就是运行map和reduce任务。YarnChild在单独的JVM中运行。
作业任务的状态更新
每个作业和它的每个任务都有一个状态:作业或者任务的状态(运行中,成功,失败等),map和reduce的进度,作业计数器的值,状态消息或描述当作业处于正在运行中的时候,客户端可以直接与application master通信,每秒(可以通过参数mapreduce.client.progressmonitor.pollinterval设置)轮询作业的执行状态,进度等信息。
作业的完成
- 当application master收到最后一个任务已完成的通知,便把作业的状态设置为成功。
- 在job轮询作业状态时,知道任务已经完成,然后打印消息告知用户,并从waitForCompletion()方法返回。
- 当作业完成时,application master和container会清理中间数据结果等临时问题。OutputCommiter的commitJob()方法被调用,作业信息由作业历史服务存档,以便用户日后查询。
MapReduce 中的 shuffle 过程
mapreduce确保每个reduce的输入都是按照键值排序的,系统执行排序,将map的输入作为reduce的输入过程称之为shuffle过程。shuffle也是我们优化的重点部分。shuffle流程图如下图所示:
map端
- 在生成map之前,会计算文件分片的大小
- 然后会根据分片的大小计算map的个数,对每一个分片都会产生一个map作业,或者是一个文件(
小于分片大小*1.1
)生成一个map作业,然后通过自定的map方法进行自定义的逻辑计算,计算完毕后会写到本地磁盘。- 1.在这里不是直接写入磁盘,为了保证IO效率,采用了先写入内存的环形缓冲区,并做一次预排序(快速排序)。缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存缓冲区的大小到达一定比例时,默认为80%(可通过
mapreduce.map.sort.spill.percent
配置项修改),将启动一个溢写线程将内存缓冲区的内容溢写到磁盘(spill to disk),这个溢写线程是独立的,不影响map向缓冲区写结果的线程,在溢写到磁盘的过程中,map继续输入到缓冲中,如果期间缓冲区被填满,则map写会被阻塞到溢写磁盘过程完成。溢写是通过轮询的方式将缓冲区中的内存写入到本地mapreduce.cluster.local.dir
目录下。在溢写到磁盘之前,我们会知道reduce的数量,然后会根据reduce的数量划分分区,默认根据hashpartition对溢写的数据写入到相对应的分区。在每个分区中,后台线程会根据key进行排序,所以溢写到磁盘的文件是分区且排序的。如果有combiner函数,它在排序后的输出运行,使得map输出更紧凑。减少写到磁盘的数据和传输给reduce的数据。 - 2.每次环形换冲区的内存达到阈值时,就会溢写到一个新的文件,因此当一个map溢写完之后,本地会存在多个分区切排序的文件。在map完成之前会把这些文件合并成一个分区且排序(归并排序)的文件,可以通过参数
mapreduce.task.io.sort.factor
控制每次可以合并多少个文件。 - 3.在map溢写磁盘的过程中,对数据进行压缩可以提交速度的传输,减少磁盘io,减少存储。默认情况下不压缩,使用参数mapreduce.map.output.compress控制,压缩算法使用
mapreduce.map.output.compress.codec
参数控制。
- 1.在这里不是直接写入磁盘,为了保证IO效率,采用了先写入内存的环形缓冲区,并做一次预排序(快速排序)。缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存缓冲区的大小到达一定比例时,默认为80%(可通过
reduce端
- map任务完成后,监控作业状态的application master便知道map的执行情况,并启动reduce任务,application master并且知道map输出和主机之间的对应映射关系,reduce轮询application master便知道主机所要复制的数据。
- 一个Map任务的输出,可能被多个Reduce任务抓取。每个Reduce任务可能需要多个Map任务的输出作为其特殊的输入文件,而每个Map任务的完成时间可能不同,当有一个Map任务完成时,Reduce任务就开始运行。Reduce任务根据分区号在多个Map输出中抓取(fetch)对应分区的数据,这个过程也就是Shuffle的copy过程。。reduce有少量的复制线程,因此能够并行的复制map的输出,默认为5个线程。可以通过参数
mapreduce.reduce.shuffle.parallelcopies
控制。 - 这个复制过程和map写入磁盘过程类似,也有阀值和内存大小,阀值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操作和合并文件操作。
- 如果map输出很小,则会被复制到Reducer所在节点的内存缓冲区,缓冲区的大小可以通过mapred-site.xml文件中的
mapreduce.reduce.shuffle.input.buffer.percent
指定。一旦Reducer所在节点的内存缓冲区达到阀值,或者缓冲区中的文件数达到阀值,则合并溢写到磁盘。 - 如果map输出较大,则直接被复制到Reducer所在节点的磁盘中。随着Reducer所在节点的磁盘中溢写文件增多,后台线程会将它们合并为更大且有序的文件。当完成复制map输出,进入sort阶段。这个阶段通过归并排序逐步将多个map输出小文件合并成大文件。最后几个通过归并合并成的大文件作为reduce的输出
更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
MapReduce 编程模型
Wordcount,即统计一大批文件中每个单词出现的次数,经常被拿来当做MapReduce入门案例。主要处理流程如下:
MapReduce将作业的整个运行过程分为两个阶段: Map(映射)阶段和Reduce(归约)阶段。
Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:
- 数据或计算的规模相对原任务要大大缩小;
- 就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;
- 这些小任务可以并行计算,彼此间几乎没有依赖关系。
Map 阶段由一定数量的 Map Task 组成,包含如下几个步骤:
- 输入数据格式解析: InputFormat
- 输入数据处理: Mapper
- 结果本地汇总:Combiner( local reducer)
- 数据分组: Partitioner
Reducer负责对map阶段的结果进行汇总。
Reduce阶段由一定数量的Reduce Task组成,包含如下几个步骤:
- 数据远程拷贝
- 数据按照key排序
- 数据处理: Reducer
- 数据输出格式: OutputFormat
以Wordcount为例,MapReduce的内部执行过程如下图所示:
外部物理结构如下图所示:
Combiner可以看做是 local reducer,在Mapper计算完成后将相同的key对应的value进行合并( Wordcount例子),如下图所示:
Combiner通常与Reducer逻辑是一样的,使用Combiner有如下好处:
- 减少Map Task输出数据量(磁盘IO)
- 减少Reduce-Map网络传输数据量(网络IO)
需要注意的是,并不是所有的MapReduce场景都能够使用Combiner,计算结果可以累加的场景一般可以使用,例如Sum,其他的例如求平均值 Average 则不能使用 Combiner。
实践操作
本地调试环境搭建
- 1.windows环境安装jdk1.8
- 2.解压hadoop文件,设置环境变量,并拷贝对版本的hadoop.dll和winutils.exe到bin目录下
- 3.修改对应 core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml,并修改hadoop-env.cmd
- 4.初始化:hdfs namenode -format
core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
hdfs-site.xml
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>D:\software\hadoop\data\namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>D:\software\hadoop\data\datanode</value>
</property>
<!-- mapred-site.xml-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
hadoop-env.cmd
set JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_271
set HADOOP_LOG_DIR=%HADOOP_LOG_DIR%\log
java操作mapreduce
单个MapReduce分为:Mapper、Reducer 和 Driver。 也可以多个MapReduce串行执行
1.Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每一个<K,V>调用一次
2.Reducer阶段
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是
封装了MapReduce程序相关运行参数的job对象
WordCount案例
案例说明:读取一个文件,按空格对文件内容分词,最终按单词排序输出每个单词出现的次数。
<!-- MAVEN包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
//2.编写好mapreduce代码本地测试(两种常见方式)
//3.上传jar包到服务器运行
hadoop jar apache-hadoop-1.0-SNAPSHOT.jar com.wxl.hadoop.mapReduce.wordCount.WordCountDriver /mapreduce/input/word.txt /mapreduce/output
方式一
package com.wxl.hadoop.mapReduce.wordCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Date; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { ///本地测试,正式环境需要注释掉// Date date = new Date();//保证输出的目录不重复 args = new String[]{"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\input\\word.txt", "D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\output\\" + date.getTime()};
// 1 获取配置信息以及获取 job 对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本 Driver 程序的 jar
job.setJarByClass(WordCountDriver.class);
// 3 关联 Mapper 和 Reducer 的 jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
//Map阶段
class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();//分割后的单词
IntWritable v = new IntWritable(1);//计数
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 按空格切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
//排除空值
if (word.trim() == "" || word.length() == 0) {
continue;
}
System.out.println("map输出>>>" + word);
// 设置输出的key为切割的单词
k.set(word);
// 按单词和计数输出
context.write(k, v);
}
}
}
//Reducer
class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
// 得到最终的结果
context.write(key, v);
}
}
方式二
package com.wxl.hadoop.mapReduce.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Date;
public class WordCountMapReduce extends Configured implements Tool {
public static void main(String[] args) throws Exception {
///本地测试,正式环境需要注释掉//
Date date = new Date();//保证输出的目录不重复
args = new String[]{"D:\ideawork\bigdata\apache-hadoop\src\main\resources\mapreduce\input\word.txt",
"D:\ideawork\bigdata\apache-hadoop\src\main\resources\mapreduce\output\" + date.getTime()};
// run job
int status = ToolRunner.run(new WordCountMapReduce(), args);
// exit program
System.exit(status);
}
// Driver
public int run(String[] args) throws Exception {
// 1 获取配置信息以及获取 job 对象
Configuration conf = super.getConf();
//设置job名
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
// 2 关联本 Driver 程序的 jar
job.setJarByClass(this.getClass());
// 3 关联 Mapper 和 Reducer 的 jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// submit job
boolean status = job.waitForCompletion(true);
return status ? 0 : 1;
}
//Map阶段
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();//分割后的单词
IntWritable v = new IntWritable(1);//计数
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 按空格切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
//排除空值
if (word.trim() == "" || word.length() == 0) {
continue;
}
System.out.println("map输出>>>" + word);
// 设置输出的key为切割的单词
k.set(word);
// 按单词和计数输出
context.write(k, v);
}
}
}
//Reducer
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
// 得到最终的结果
context.write(key, v);
}
}
}
更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
总结
优点
易于编程
MapReduce向用户提供了简单的编程接口,由框架层自动完成数据分布存储、数据通信、容错处理等复杂的底层处理细节,用户只需要使用接口实现自己的数据处理逻辑即可。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一 样的,就是因为这个特点使得 MapReduce 编程变得非常流行。
良好的扩展性
当你的计算资源不能得到满足的时候,可以通过简单的增加机器来扩展它的计算能力。
高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高 的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行, 不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
大数据量
适合 PB 级以上海量数据的离线处理,可以实现上千台服务器集群并发工作,提供数据处理能力。
缺点
不擅长实时计算
MapReduce 无法像MySQL一样,在毫秒或者秒级内返回结果。
不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。 这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下, MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
参考文章:https://www.jianshu.com/p/a61fd904e2c5 cnblogs.com/liugp/p/16101242.html
cnblogs.com/ttzzyy/p/12323259.html blog.csdn.net/qq_38414334/article/details/12047614