Hadoop的分布式计算系统MapReduce

一.序列化

在MapReduce中要求被传输的数据能够被序列化 MapReduce中的序列化机制使用的是AVRO,MapReduce对AVRO进行了封装 被传输的类实现Writable接口实现方法即可

二.mapreduce 排序

代码语言:javascript
复制
在MapReduce中会自动对被传输的key值进行排序,如果使用一个对象
作为输出键,那么要求对象相对应的类应该实现Comparable接口,考虑到
MapReduce中被传输的对象要求被序列化,所以MapReduce中提供了WritableComparable
接口.
如果ComparaTo方法中返回值为0,则MapReduce在进行计算时会把两个键的值放到
一个迭代器中,输出是第二个key是没有记录的。

mapreduce 分区

代码语言:javascript
复制
我们在使用MapReduce对HDFS中的数据进行计算时,有时可能会有分类
输出的场景,MapReduce中提供了Partitioner类,我们在使用时只需继承
该类,然后重写getPartition方法即可,分区编号默认从0开始。
有多少个分区JobTracter就会分配多少个reduceTask。分区数量要在
驱动类中指定,如果不指定分区类与ReduceTask的数量,则使用默认
的HashPartitioner类进行分区,也就是自定义的分区无效。

mapreduce 合并

代码语言:javascript
复制
1.合并是减少数据总量并没有改变计算结果 - Combiner(合并)实际上只是
让MapTask进行提前聚合,最后ReduceTask在进行总的聚合.
2.并不是所有的场景都适合于用Combiner,像求和、求最值、去重等可以使用
combiner,但是例如求平均的场景不适合与使用Combiner

inputFormat

inputFormat用来获取切片并创建流来读取数据,如果不自定义InputFormat 则默认使用TextInputFormt按行读取Mapper获取的key值为当前行数的偏移量, 自定义inputFormat类只需要继承FileInputFormat类自定义读取文件的即可.

Code

案例:对HDFS中的case.txt文件按月份对每个人的收益进行降序排序

inputformat类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;

// 泛型表示读完之后给mapper的数据类型
public class CaseInputFormat extends FileInputFormat<Text, Text> {

@Override
public RecordReader&lt;Text, Text&gt; createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    return new CaseReader();
}

}

class CaseReader extends RecordReader<Text,Text>{

private LineReader reader;
private Text key;
private Text value;

// 初始化Reader
// 最终目的就是拿到读取切片的流
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

    FileSplit fs = (FileSplit) inputSplit;
    Path path = fs.getPath();

    // 连接HDFS获取切片
    FileSystem fileSystem = FileSystem.get(URI.create(path.toString()), taskAttemptContext.getConfiguration());
    // 打开对应的文件获取输入流
    InputStream in = fileSystem.open(path);
   // 字节流转换为字符流按行读取
    reader = new LineReader(in);
}

// 判断有无下一个键值对
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    key = new Text();
    value = new Text();
    Text tmp = new Text();

    // 将读取到的一行数据传入到这个参数中 返回值为读取到字节数
    if (reader.readLine(tmp) == ) {
        return false;
    }
    key.set(tmp);

    if (reader.readLine(tmp) == ) {
        return false;
    }
    value.set(tmp);

    return true;
}

// 获取当前key值
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
    return key;
}

// 获取当前value值
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

// 获取进度
@Override
public float getProgress() throws IOException, InterruptedException {
    return ;
}

// 关流
@Override
public void close() throws IOException {
    if (reader != null)
        reader.close();
}

}

Case类用来封装读取的文本
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Case implements WritableComparable<Case> {
private int month;

public int getMonth() {
    return month;
}

public void setMonth(int month) {
    this.month = month;
}

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

private String name;
private int inCome;
private int outCome;
private int countCome;

public int getInCome() {
    return inCome;
}

public void setInCome(int inCome) {
    this.inCome = inCome;
}

public int getOutCome() {
    return outCome;
}

public void setOutCome(int outCome) {
    this.outCome = outCome;
}

public int getCountCome() {
    return inCome - outCome;
}

public void setCountCome(int countCome) {
    this.countCome = countCome;
}

@Override
public int compareTo(Case o) {
    return o.getCountCome() - this.getCountCome();
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeInt(month);
    dataOutput.writeUTF(name);
    dataOutput.writeInt(inCome);
    dataOutput.writeInt(outCome);
    dataOutput.writeInt(countCome);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
    month = dataInput.readInt();
    name = dataInput.readUTF();
    inCome = dataInput.readInt();
    outCome = dataInput.readInt();
    countCome = dataInput.readInt();
}

}

Mapper类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CaseMapper extends Mapper<Text,Text,Case,NullWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String[] s = key.toString().split(" ");
String[] s1 = value.toString().split(" ");
Case aCase = new Case();
aCase.setMonth(Integer.parseInt(s[]));
aCase.setName(s[]);
aCase.setInCome(Integer.parseInt(s1[]));
aCase.setOutCome(Integer.parseInt(s1[]));

    context.write(aCase,NullWritable.get());
}

}

Reducer类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CaseReducer extends Reducer<Case, NullWritable,Text, IntWritable> {
@Override
protected void reduce(Case key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(new Text(key.getName()),new IntWritable(key.getCountCome()));
}
}

Driver类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CaseDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    // 获取job 提交给JobTracter
    Job job = Job.getInstance();

    // 入口类
    job.setJarByClass(CaseDriver.class);
    // mapper类
    job.setMapperClass(CaseMapper.class);
    // reducer类
    job.setReducerClass(CaseReducer.class);
    // inputformat类
    job.setInputFormatClass(CaseInputFormat.class);
    // partitioner类 reduceTask数量
    job.setPartitionerClass(CasePartition.class);
    job.setNumReduceTasks();

    // mapper
    job.setMapOutputKeyClass(Case.class);
    job.setMapOutputValueClass(NullWritable.class);
    // reducer
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // 处理文件路径
    FileInputFormat.addInputPath(job,new Path(&#34;hdfs://10.42.99.103:9000/case.txt&#34;));
    // 文件输出路径
    FileOutputFormat.setOutputPath(job,new Path(&#34;hdfs://10.42.99.103:9000/caseResult&#34;));

    // 启动
    job.waitForCompletion(true);
}

}

case.txt
代码语言:javascript
复制
1 ls
2850 100
2 ls
3566 200
3 ls
4555 323
1 zs
19000 2000
2 zs
28599 3900
3 zs
34567 5000
1 ww
355 10
2 ww
555 222
3 ww
667 192

数据本地化策略

代码语言:javascript
复制
1.job会被提交到JobTracker,JobTracker会访问HDFS中的namenode,
获取Block的存储位置以及大小.
2.JobTracker收到文件信息之后会对文件进行切片,默认Block的大小
就是切片的大小,切片的数量决定了mapTask的数量。
3.JobTacker计算完mapTask与ReduceTask的数量之后会把任务提交给TaskTracker
,为了减少集群间节点间的访问,TaskTracker会与datanode部署在同一个节点上
4. JobTracker在分配任务的时候,会尽量将任务分配给有数据的节点
  • 如果是空文件,则整个文件作为一个切片处理
  • 在MapReduce中,文件有可切和不可切的区分。在MapReduce中,默认文件是可切的,但是有些文件处理的时候不能切分,这个时候需要手动设置为不可切,例如压缩包
  • 如果文件不可切,则整个文件作为一个切片处理
  • 计算切片大小的公式为Math.min(minSize,Math.max(spilteSize,maxSize))
  • 在进行切片计算的时候底层有一个阈值为1.1

Job任务提交流程

代码语言:javascript
复制
1.客户端将任务提交给JobTracker:hadoop jar ***.jar
2.准备阶段:
a.检查输入路径是否存在,输出路径是否不存在
b.计算切片数量以及分区
c.如果有需要,可以设置分布式缓存存根账户
d.将jar包提交到HDFS上
e.将任务提交到JobTracker上
3. 提交阶段
a. JobTracker会计算MapTask的数量和ReduceTask的数量。
MapTask的数量由切片数量决定,ReduceTask的数量由分区数量决定
b. JobTracker在划分好之后,会等待TaskTracker的心跳。
当收到TaskTracker的心跳的时候,JobTracker就会将MapTask或者
ReduceTask分配给TaskTracker。在分配的时候,MapTask要尽量满足
数据本地化策略;ReduceTask尽量分配到相对空闲的节点上
c. TaskTracker在领取到任务之后,去连接HDFS下载对应的jar包
体现的逻辑移动数据固定的思想
d. TaskTracker会在本节点上开启一个JVM子进程执行MapTask或者
ReduceTask。每一个MapTask或者ReduceTask的执行都会开启一个JVM
子进程

Shuffle

一、Map端的Shuffle
代码语言:javascript
复制
1. Mapper中的map方法在处理完一行数据之后,会将数据写出到缓冲区中
2. 数据在缓冲区中进行分区、排序,如果指定了Combiner,那么数据在缓冲区中还会进行combine合并 - 采取了快速排序的方式
3. 每一个MapTask自带一个缓冲区,缓冲区本质上是一个环形的字节数组。设置为环形的优势在于能够重复利用缓冲区而不用寻址
4. 缓冲区是维系在内存中,缓冲区的默认容量是100M
5. 缓冲区的容量使用达到一定限度(溢写阈值:0.8,目的是为了避免MapTask写出结果的时候产生大量的阻塞)的时候,MapTask会将缓冲区中的数据溢写(spill)到磁盘上,后续的数据可以继续写到缓冲区中
6. 每一次溢写都会产生一个新的溢写文件。单个溢写文件中的数据是分区且排序的,但是所有的溢写文件中的数据是局部有序整体无序
7. 当MapTask将所有数据都处理完成之后,会将所有的溢写文件合并(merge)成一个结果文件(final out)。如果一部分结果在缓冲区中,一部分结果在溢写文件中,这个时候所有的结果会直接合并到最后的final out中。如果没有产生溢写过程,则缓冲区中的数据直接冲刷到final out中
8. 在merge过程中,数据会再次进行分区和排序,所以final out是整体分区且有序。这个过程中的排序使用的是归并排序。如果指定了Combiner,并且溢写文件的个数大于等于3个,那么在merge过程中自动进行combine
二、Reduce端的Shuffle
代码语言:javascript
复制
1. 每一个ReduceTask启动fetch线程通过get请求去抓取数据
2. 在抓取数据的时候,每一个ReduceTask只抓取当前分区的数据。在抓取到数据的之后,会将数据存储在本地的磁盘上
3. 在抓取完成之后,ReduceTask会将这些小文件进行merge,合并成一个大文件。在合并过程中,数据会再次进行排序,采取的是归并排序
4. 合并完成之后,会将相同的键对应的值放到一个迭代器中,这个过程称之为分组(group),形成的结构就是一个键对应一个迭代器每一个键触发一次reduce方法