您现在的位置是: 首页 > MapReduce分布式计算框架


MapReduce分布式计算框架

纯真年代 2018-09-16 21:18:42 0 580人围观

直达车 MapReduce

MapReduce概述

    1)源自Google的MapReduce论文,发表于2004.12

    2)Hadoop MapReduce是Google MapReduce的克隆版

    3)MapReduce优点:海量数据离线处理&易开发&易运行

    4)MapReduce缺点:实时流式计算

MapReduce编程模型之wordcount词频统计分析入门

    例如:统计words.txt中单词出现的次数


    当文件很大很大的时候


    例图:

        1)文件拆分,如图:假如我们一行就是一个被拆分的文件,这里三个文件同时进行处理

        2)Mapping:按照指定分隔符进行拆分单词,给每个单词赋值为1,这个阶段我们只知道每个单词出现1的次数

        3)Shuffling:将相同的数据分到一个节点上去合并

        4)Reducing:将每个单词数量加起来

        (上面这些文件都不是在同一台机器上的,都是并行处理的)  

    官网直达车

    A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

    (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)


java实现WordCount

    1)代码

package cn.mapReduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
* @ClassName: WordCountApp
* @Description: 使用MapReduce开发WordCount应用程序
* @author: yy
* @date: 2018年9月17日 下午11:50:03
*
* @Copyright: 2018 www.xxx.com Inc. All rights reserved.
* @note: 注意:本内容仅限于xxx公司内部传阅,禁止外泄以及用于其他的商业目
*/
public class WordCountApp {

/**
* Map:要读取输入的文件
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

LongWritable one = new LongWritable(1);

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//接受到的每一行数据
String line = value.toString();
//按照制定分隔符进行拆分
String[] words = line.split(" ");
for (String word : words) {
//通过上下文吧map的处理结果输出
context.write(new Text(word), one);
}
}

}

/**
* Reduce:归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
//求单词的key出现的次数总和
sum += value.get();
}
//最终将统计结果输出
context.write(key, new LongWritable(sum));
}
}

/**
* 定义Driver:封装MapReduce作业的所有信息
*/
public static void main(String[] args) throws Exception {

//1.创建Configuration
Configuration configuration = new Configuration();

//准备清理已经存在的输出目录(不然重复运行会输出目录已存在错误)
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
System.out.println("The output file already exists but has been deleted");
}
//2.创建Job
Job job = Job.getInstance(configuration, "wordcount");

//2.1设置job的处理类
job.setJarByClass(WordCountApp.class);

//3.设置锁业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));

//设置map相关的参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//设置作业处理结果的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


    2)打成jar包到服务器上运行 

mvn clean package -DskipTests



    3)运行

hadoop jar hadoop-train-1.0.jar cn.mapReduce.WordCountApp hdfs://192.168.31.140:8020/words.txt hdfs://192.168.31.140:8020/output/wc

    4)查看结果




MapReduce编程之Combiner

    1)相当于在map端做了一个本地的小的reduce操作,再把结果传到最终的Reducer


    2)减少Map Tasks输出的数据量及数据网络传输量



    3)代码

//通过job设置combiner处理类,其实逻辑上和我们的reduce是一模一样的
job.setCombinerClass(MyReducer.class);



    4)打包-上传-运行-结果查看

mvn clean package -DskipTests
hadoop jar hadoop-train-1.0.jar cn.mapReduce.CombinerApp hdfs://192.168.31.140:8020/words.txt hdfs://192.168.31.140:8020/output/wc
  1.     之前的

  2.     

  3.     combiner之后的(说明已经生效了)

  4.     


    5)combiner适用场景

        比如求和、次数之类的可行

        比如平均数就不行了


MapReduce编程之Partitioner

    1)  默认实现:分发的key的hash值对Reduce Task个数取模


    2)Partitioner决定MapTask输出的数据交由哪个ReduceTask去处理。Partitioner 处于 Mapper阶段,当Mapper处理好数据后,这些数据需要经过Partitioner进行分区,来选择不同的Reducer处理,从而将Mapper的输出结果均匀的分布在Reducer上面执行。



    3)上代码

        本例用到数据:(我们一行表示手机销量,空格分隔。简单修改mapper)

        

        Map处:

context.write(new Text(words[0]), new LongWritable(Long.parseLong(words[1])));



        Partitioner:

public static class MyPartitioner extends Partitioner<Text, LongWritable> {

@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
if ("xiaomi".equals(key.toString())) {
return 0;
}
if ("huawei".equals(key.toString())) {
return 1;
}
if ("meizu".equals(key.toString())) {
return 2;
}
return 3;
}
}



        main里面:

//设置job的partition
job.setPartitionerClass(MyPartitioner.class);
//设置4个reducer,每个分区一个
job.setNumReduceTasks(4);



    4)打包-上传-运行-查看结果

mvn clean package -DskipTests
hadoop jar hadoop-train-1.0.jar cn.mapReduce.ParititonerApp hdfs://192.168.31.140:8020/phones.txt hdfs://192.168.31.140:8020/partitioner/wc




说明:整理于网络 
本文章作为个人笔记,也供大家监督 

 

全部评论