一.序列化
在MapReduce中要求被传输的数据能够被序列化 MapReduce中的序列化机制使用的是AVRO,MapReduce对AVRO进行了封装 被传输的类实现Writable接口实现方法即可
二.mapreduce 排序
在MapReduce中会自动对被传输的key值进行排序,如果使用一个对象
作为输出键,那么要求对象相对应的类应该实现Comparable接口,考虑到
MapReduce中被传输的对象要求被序列化,所以MapReduce中提供了WritableComparable
接口.
如果ComparaTo方法中返回值为0,则MapReduce在进行计算时会把两个键的值放到
一个迭代器中,输出是第二个key是没有记录的。
mapreduce 分区
我们在使用MapReduce对HDFS中的数据进行计算时,有时可能会有分类
输出的场景,MapReduce中提供了Partitioner类,我们在使用时只需继承
该类,然后重写getPartition方法即可,分区编号默认从0开始。
有多少个分区JobTracter就会分配多少个reduceTask。分区数量要在
驱动类中指定,如果不指定分区类与ReduceTask的数量,则使用默认
的HashPartitioner类进行分区,也就是自定义的分区无效。
mapreduce 合并
1.合并是减少数据总量并没有改变计算结果 - Combiner(合并)实际上只是
让MapTask进行提前聚合,最后ReduceTask在进行总的聚合.
2.并不是所有的场景都适合于用Combiner,像求和、求最值、去重等可以使用
combiner,但是例如求平均的场景不适合与使用Combiner
inputFormat
inputFormat用来获取切片并创建流来读取数据,如果不自定义InputFormat 则默认使用TextInputFormt按行读取Mapper获取的key值为当前行数的偏移量, 自定义inputFormat类只需要继承FileInputFormat类自定义读取文件的即可.
Code
案例:对HDFS中的case.txt文件按月份对每个人的收益进行降序排序
inputformat类
package com.jmy.profitcase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;// 泛型表示读完之后给mapper的数据类型
public class CaseInputFormat extends FileInputFormat<Text, Text> {@Override public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new CaseReader(); }
}
class CaseReader extends RecordReader<Text,Text>{
private LineReader reader; private Text key; private Text value; // 初始化Reader // 最终目的就是拿到读取切片的流 @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSplit fs = (FileSplit) inputSplit; Path path = fs.getPath(); // 连接HDFS获取切片 FileSystem fileSystem = FileSystem.get(URI.create(path.toString()), taskAttemptContext.getConfiguration()); // 打开对应的文件获取输入流 InputStream in = fileSystem.open(path); // 字节流转换为字符流按行读取 reader = new LineReader(in); } // 判断有无下一个键值对 @Override public boolean nextKeyValue() throws IOException, InterruptedException { key = new Text(); value = new Text(); Text tmp = new Text(); // 将读取到的一行数据传入到这个参数中 返回值为读取到字节数 if (reader.readLine(tmp) == ) { return false; } key.set(tmp); if (reader.readLine(tmp) == ) { return false; } value.set(tmp); return true; } // 获取当前key值 @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } // 获取当前value值 @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } // 获取进度 @Override public float getProgress() throws IOException, InterruptedException { return ; } // 关流 @Override public void close() throws IOException { if (reader != null) reader.close(); }
}
Case类用来封装读取的文本
package com.jmy.profitcase;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Case implements WritableComparable<Case> {
private int month;public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public String getName() { return name; } public void setName(String name) { this.name = name; } private String name; private int inCome; private int outCome; private int countCome; public int getInCome() { return inCome; } public void setInCome(int inCome) { this.inCome = inCome; } public int getOutCome() { return outCome; } public void setOutCome(int outCome) { this.outCome = outCome; } public int getCountCome() { return inCome - outCome; } public void setCountCome(int countCome) { this.countCome = countCome; } @Override public int compareTo(Case o) { return o.getCountCome() - this.getCountCome(); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(month); dataOutput.writeUTF(name); dataOutput.writeInt(inCome); dataOutput.writeInt(outCome); dataOutput.writeInt(countCome); } @Override public void readFields(DataInput dataInput) throws IOException { month = dataInput.readInt(); name = dataInput.readUTF(); inCome = dataInput.readInt(); outCome = dataInput.readInt(); countCome = dataInput.readInt(); }
}
Mapper类
package com.jmy.profitcase;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
public class CaseMapper extends Mapper<Text,Text,Case,NullWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String[] s = key.toString().split(" ");
String[] s1 = value.toString().split(" ");
Case aCase = new Case();
aCase.setMonth(Integer.parseInt(s[]));
aCase.setName(s[]);
aCase.setInCome(Integer.parseInt(s1[]));
aCase.setOutCome(Integer.parseInt(s1[]));context.write(aCase,NullWritable.get()); }
}
Reducer类
package com.jmy.profitcase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
public class CaseReducer extends Reducer<Case, NullWritable,Text, IntWritable> {
@Override
protected void reduce(Case key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(new Text(key.getName()),new IntWritable(key.getCountCome()));
}
}
Driver类
package com.jmy.profitcase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
public class CaseDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 获取job 提交给JobTracter Job job = Job.getInstance(); // 入口类 job.setJarByClass(CaseDriver.class); // mapper类 job.setMapperClass(CaseMapper.class); // reducer类 job.setReducerClass(CaseReducer.class); // inputformat类 job.setInputFormatClass(CaseInputFormat.class); // partitioner类 reduceTask数量 job.setPartitionerClass(CasePartition.class); job.setNumReduceTasks(); // mapper job.setMapOutputKeyClass(Case.class); job.setMapOutputValueClass(NullWritable.class); // reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 处理文件路径 FileInputFormat.addInputPath(job,new Path("hdfs://10.42.99.103:9000/case.txt")); // 文件输出路径 FileOutputFormat.setOutputPath(job,new Path("hdfs://10.42.99.103:9000/caseResult")); // 启动 job.waitForCompletion(true); }
}
case.txt
1 ls
2850 100
2 ls
3566 200
3 ls
4555 323
1 zs
19000 2000
2 zs
28599 3900
3 zs
34567 5000
1 ww
355 10
2 ww
555 222
3 ww
667 192
数据本地化策略
1.job会被提交到JobTracker,JobTracker会访问HDFS中的namenode,
获取Block的存储位置以及大小.
2.JobTracker收到文件信息之后会对文件进行切片,默认Block的大小
就是切片的大小,切片的数量决定了mapTask的数量。
3.JobTacker计算完mapTask与ReduceTask的数量之后会把任务提交给TaskTracker
,为了减少集群间节点间的访问,TaskTracker会与datanode部署在同一个节点上
4. JobTracker在分配任务的时候,会尽量将任务分配给有数据的节点
- 如果是空文件,则整个文件作为一个切片处理
- 在MapReduce中,文件有可切和不可切的区分。在MapReduce中,默认文件是可切的,但是有些文件处理的时候不能切分,这个时候需要手动设置为不可切,例如压缩包
- 如果文件不可切,则整个文件作为一个切片处理
- 计算切片大小的公式为Math.min(minSize,Math.max(spilteSize,maxSize))
- 在进行切片计算的时候底层有一个阈值为1.1
Job任务提交流程
1.客户端将任务提交给JobTracker:hadoop jar ***.jar
2.准备阶段:
a.检查输入路径是否存在,输出路径是否不存在
b.计算切片数量以及分区
c.如果有需要,可以设置分布式缓存存根账户
d.将jar包提交到HDFS上
e.将任务提交到JobTracker上
3. 提交阶段
a. JobTracker会计算MapTask的数量和ReduceTask的数量。
MapTask的数量由切片数量决定,ReduceTask的数量由分区数量决定
b. JobTracker在划分好之后,会等待TaskTracker的心跳。
当收到TaskTracker的心跳的时候,JobTracker就会将MapTask或者
ReduceTask分配给TaskTracker。在分配的时候,MapTask要尽量满足
数据本地化策略;ReduceTask尽量分配到相对空闲的节点上
c. TaskTracker在领取到任务之后,去连接HDFS下载对应的jar包
体现的逻辑移动数据固定的思想
d. TaskTracker会在本节点上开启一个JVM子进程执行MapTask或者
ReduceTask。每一个MapTask或者ReduceTask的执行都会开启一个JVM
子进程
Shuffle
一、Map端的Shuffle
1. Mapper中的map方法在处理完一行数据之后,会将数据写出到缓冲区中
2. 数据在缓冲区中进行分区、排序,如果指定了Combiner,那么数据在缓冲区中还会进行combine合并 - 采取了快速排序的方式
3. 每一个MapTask自带一个缓冲区,缓冲区本质上是一个环形的字节数组。设置为环形的优势在于能够重复利用缓冲区而不用寻址
4. 缓冲区是维系在内存中,缓冲区的默认容量是100M
5. 缓冲区的容量使用达到一定限度(溢写阈值:0.8,目的是为了避免MapTask写出结果的时候产生大量的阻塞)的时候,MapTask会将缓冲区中的数据溢写(spill)到磁盘上,后续的数据可以继续写到缓冲区中
6. 每一次溢写都会产生一个新的溢写文件。单个溢写文件中的数据是分区且排序的,但是所有的溢写文件中的数据是局部有序整体无序
7. 当MapTask将所有数据都处理完成之后,会将所有的溢写文件合并(merge)成一个结果文件(final out)。如果一部分结果在缓冲区中,一部分结果在溢写文件中,这个时候所有的结果会直接合并到最后的final out中。如果没有产生溢写过程,则缓冲区中的数据直接冲刷到final out中
8. 在merge过程中,数据会再次进行分区和排序,所以final out是整体分区且有序。这个过程中的排序使用的是归并排序。如果指定了Combiner,并且溢写文件的个数大于等于3个,那么在merge过程中自动进行combine
二、Reduce端的Shuffle
1. 每一个ReduceTask启动fetch线程通过get请求去抓取数据
2. 在抓取数据的时候,每一个ReduceTask只抓取当前分区的数据。在抓取到数据的之后,会将数据存储在本地的磁盘上
3. 在抓取完成之后,ReduceTask会将这些小文件进行merge,合并成一个大文件。在合并过程中,数据会再次进行排序,采取的是归并排序
4. 合并完成之后,会将相同的键对应的值放到一个迭代器中,这个过程称之为分组(group),形成的结构就是一个键对应一个迭代器每一个键触发一次reduce方法