目录

切片与MapTask并行度决定机制

序列化

序列化案例实操

 Shuffle机制

 Partition分区

 WritableComparable排序

 WritableComparable排序案例实操(全排序)

Combiner合并

 自定义OutputFormat案例实操        

                   

自定义OutputFormat案例实操

MapTask工作机制总结

ReduceTask工作机制

ReduceTask并行度决定机制

Map Join

数据清洗(ETL)

Hadoop数据压缩

 压缩方式选择

切片与MapTask并行度决定机制

MapTask并行度决定机制: 数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。 数据切片与MapTask并行度决定机制: 1) 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定 2)每一个Split切片分配一个MapTask并行实例处理 3)默认情况下,切片大小=BlockSize 4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。 

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

4)Hadoop序列化特点:

(1)紧凑 :高效使用存储空间。

(2)快速:读写数据的额外开销小。

(3)互操作:支持多语言的交互

序列化案例实操

统计每一个手机号耗费的总上行流量、总下行流量、总流量 

7 13560436666 120.196.100.99 1116  954 200  (例)

id 手机号码 网络ip 上行流量  下行流量     网络状态码

13560436666 1116       954 2070

手机号码     上行流量        下行流量 总流量

package com.wyc.mapreduce.writable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

/**

1、定义类实现writable接口

2、重写序列化和反序列化方法

3、重写一个空参构造

4、重写Tostrin方法

*/

public class FlowBean implements Writable {

private long upFlow;

private long downFlow;

private long sumFlow;

//空参构造

public FlowBean() {

}

public long getUpFlow() {

return upFlow;

}

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

}

public long getDownFlow() {

return downFlow;

}

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

}

public long getSumFlow() {

return sumFlow;

}

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

}

public void setSumFlow() {

this.sumFlow = this.upFlow+this.downFlow;

}

//序列化方法

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);

}

@Override

public void readFields(DataInput in) throws IOException {

this.upFlow=in.readLong();

this.downFlow=in.readLong();

this.sumFlow=in.readLong();

}

@Override

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow ;

}

}

package com.wyc.mapreduce.writable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper {

private Text outk = new Text();

private FlowBean outV =new FlowBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1、获取一行

//13736230513 192.196.100.1 www.atguigu.com 2481 24681 200

String line = value.toString();

//2、切割

String[] split = line.split("\t");

//{13736230513,192.196.100.1,www.atguigu.com,2481,24681,200}放到数组里面

//3、抓取想要的数据

//13736230513,2481,24681 抓取手机号上行流量和下行流量

String phone =split[1];

String up =split[split.length-3];

String down =split[split.length-2];

//4、封装 outK outV

outk.set(phone);

outV.setUpFlow(Long.parseLong(up));

outV.setDownFlow(Long.parseLong(down));

outV.setSumFlow();

//5、写出

context.write(outk,outV);

}

}

package com.wyc.mapreduce.writable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowRducer extends Reducer {

private FlowBean outV = new FlowBean();

@Override

protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

//1、遍历集合,累加值

long totalup = 0;

long totaldown =0;

for (FlowBean value : values) {

totalup += value.getUpFlow();

totaldown += value.getDownFlow();

}

//封装outK,outV

outV.setUpFlow(totalup);

outV.setDownFlow(totaldown);

outV.setSumFlow();

//3、写出

context.write(key,outV);

}

}

package com.wyc.mapreduce.writable;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

//1、获取job对象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//2、设置jar包

job.setJarByClass(FlowDriver.class);

//3、关联mapper和Reducer

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowRducer.class);

//4、设置mapper 输出key和value

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

//5、设置最终数据输出的key和value 类型

job.setOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

//6、设置数据的输入和输出路径

FileInputFormat.setInputPaths(job,new Path("D:\\11_input\\inputflow"));

FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop text\\output2"));

//7、提交job

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

 Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

主要包括分区和排序

 Partition分区

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机 归属地不同省份输出到不同文件中(分区) 

Partition分区案例实操

根据上述代码:期望输出数据为:手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner {

@Override

public int getPartition(Text text, FlowBean flowBean, int numPartitions) {

//text 手机号

String phone = text.toString();

String perphnoe = phone.substring(0, 3);

int partition ;

if("136".equals(perphnoe)){

partition=0;

}else if ("137".equals(perphnoe)){

partition=1;

}else if ("138".equals(perphnoe)){

partition=2;

}else if ("139".equals(perphnoe)){

partition=3;

}else {

partition=4;

}

return partition;

}

}

package com.wyc.mapreduce.partitioner2;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

//1、获取job对象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//2、设置jar包

job.setJarByClass(FlowDriver.class);

//3、关联mapper和Reducer

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowRducer.class);

//4、设置mapper 输出key和value

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

//5、设置最终数据输出的key和value 类型

job.setOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

job.setPartitionerClass(ProvincePartitioner.class);

job.setNumReduceTasks(5);

//6、设置数据的输入和输出路径

FileInputFormat.setInputPaths(job,new Path("D:\\11_input\\inputflow"));

FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop text\\output7"));

//7、提交job

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

分区总结: (1)如果ReduceTask的数量>getPautition的结果数, 则会多产生几个空的输出文件paut -1-000xx; . (2)如果ReduceTask的数量

 WritableComparable排序

排序是MapReduce框架中最重要的操作之一。 MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一-次快速排序,并将这些有序数据溢写到磁盘.上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值, 则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成个更一大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次归并排序。

自定义排序WritableComparable原理分析

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

排序分类

(1)部分排序 MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。 (2)全排序. 最终输出结果只有一个文件,文件内部有序。实现方式是只设置一个ReduceTask。 但该方法在 处理大型文件时效率极低,因为- -台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。 (3)辅助排序: (Gr oupingComparator分组) 在Reduce端对key进行分组。应用于:在接收的key为bean对象时, 想让一个或几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时, 可以采用分组排序。 (4)二次排序 在自定义排序过程中,如果compare To中的判断条件为两个即为二次排序。

 WritableComparable排序案例实操(全排序)

根据案例序列化案例产生的结果再次对总流量进行倒序排序。

Hadoop默认对key排序,将上一个案例的输出作为reduce输入,将手机号作为value,总流量等数据作为key进行排序(及bean),FlowBean实现WritableCompara接口重写compareTo方法。Reducer类循环输出,避免流量相同的情况。

//Flowbean添加代码

@Override

public int compareTo(FlowBean o) {

//按照总流量比较,倒序排列

if(this.sumFlow > o.sumFlow){

return -1;//倒序排

}else if(this.sumFlow < o.sumFlow){

return 1;

}else {

//按照上行流量的正序排 --二次排序

if (this.upFlow>o.upFlow){

return 1;

}else if(this.upFlow

return -1;

}else {

return 0;

}

}

}

}

package com.wyc.mapreduce.writableComparable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper {

private FlowBean outK = new FlowBean();

private Text outV = new Text();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1 获取一行数据

String line = value.toString();

//2 按照"\t",切割数据

String[] split = line.split("\t");

//3 封装outK outV

outK.setUpFlow(Long.parseLong(split[1]));

outK.setDownFlow(Long.parseLong(split[2]));

outK.setSumFlow();

outV.set(split[0]);

//4 写出outK outV

context.write(outK,outV);

}

}

package com.wyc.mapreduce.writableComparable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {

@Override

protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {

//遍历values集合,循环写出,避免总流量相同的情况

for (Text value : values) {

//调换KV位置,反向写出

context.write(value,key);

}

}

}

package com.wyc.mapreduce.writableComparable;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//1 获取job对象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//2 关联本Driver类

job.setJarByClass(FlowDriver.class);

//3 关联Mapper和Reducer

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

//4 设置Map端输出数据的KV类型

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(Text.class);

//5 设置程序最终输出的KV类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

//6 设置输入输出路径

FileInputFormat.setInputPaths(job, new Path("D:\\hadoop text\\output2"));

FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop text\\output4"));

//7 提交Job

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);

}

}

WritableComparable排序案例实操(区内排序)

要求按照手机号分区后还要进行排序

增加自定义分区类

package com.wyc.mapreduce.partitionerandwritableComparable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner2 extends Partitioner {

@Override

public int getPartition(FlowBean flowBean, Text text, int numPartitions) {

String phone = text.toString();

String prePhone = phone.substring(0,3);

int partition;

if("136".equals(prePhone)){

partition=0;

}else if ("137".equals(prePhone)){

partition=1;

}else if ("138".equals(prePhone)){

partition=2;

}else if ("139".equals(prePhone)){

partition=3;

}else {

partition=4;

}

return partition;

}

}

(2)在驱动类中添加分区类

// 设置自定义分区器

job.setPartitionerClass(ProvincePartitioner2.class);

// 设置对应的ReduceTask的个数

job.setNumReduceTasks(5);

Combiner合并

统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。

                             

(1)增加一个WordCountCombiner类继承Reducer

package com.wyc.mapreduce.combiner;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner extends Reducer {

private IntWritable outV =new IntWritable();

@Override

protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {

int sum =0;

for (IntWritable value : values) {

sum+=value.get();

}

//封装sum写出

outV.set(sum);

context.write(key,outV);

}

}

(2)在WordcountDriver驱动类中指定Combiner 

// 指定需要使用combiner,用reducer作为combiner的逻辑

job.setCombinerClass(WordCountReducer.class);

 自定义OutputFormat案例实操        

package com.wyc.mapreduce.outputFormat;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//输入http://www.baidu.com

//http://www.google.com

//(输出http://www.baidu.com,NUllWritable);

//map阶段不做任何处理只输出

context.write(value,NullWritable.get());

}

package com.wyc.mapreduce.outputFormat;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer {

@Override

protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

//http://www.baidu.com

//http://www.google.com

//防止有相同的数据()因为是一个key没有输出

for (NullWritable value : values) {

context.write(key,NullWritable.get());

}

}

}

                   

package com.wyc.mapreduce.outputFormat;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat {

@Override

public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

//创建一个自定义的RecordWriter返回

//要重写getRecordWriter里面的方法,没有 getRecordWriter,就自己创建,再继承 RecordWriter

LogRecordWriter lrw = new LogRecordWriter(job);

return lrw;

}

}

package com.wyc.mapreduce.outputFormat;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter {//传入RecordWriter的kv

private FSDataOutputStream atguiguOut;

private FSDataOutputStream otherOut;

public LogRecordWriter(TaskAttemptContext job) {

//创建两条流

try {

FileSystem fs = FileSystem.get(job.getConfiguration());//要关联job,要用job自己的配置信息,

atguiguOut = fs.create(new Path("D:\\hadoop text\\atguigu.log"));

otherOut = fs.create(new Path("D:\\hadoop text\\other.log"));

} catch (IOException e) {

e.printStackTrace();

}

}

@Override

public void write(Text key, NullWritable value) throws IOException, InterruptedException {

//具体写

String log = key.toString();

if(log.contains("atguigu")){

atguiguOut.writeBytes(log+"\n");

}else {

otherOut.writeBytes(log+"\n");

}

}

@Override

public void close(TaskAttemptContext context) throws IOException, InterruptedException {

//关流

IOUtils.closeStream(atguiguOut);

IOUtils.closeStream(otherOut);

}

}

package com.wyc.mapreduce.outputFormat;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(LogDriver.class);

job.setMapperClass(LogMapper.class);

job.setReducerClass(LogReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

//设置自定义的outputformat

job.setOutputFormatClass(LogOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path("D:\\11_input\\inputoutputformat"));

//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat

//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录

FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop text\\output999"));

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);

}

}

自定义OutputFormat案例实操

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。 

//编写编写LogMapper类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//不做任何处理,直接写出一行log数据

context.write(value,NullWritable.get());

}

}

//编写LogReducer类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer {

@Override

protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

// 防止有相同的数据,迭代写出

for (NullWritable value : values) {

context.write(key,NullWritable.get());

}

}

}

//自定义一个LogOutputFormat类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat {

@Override

public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

//创建一个自定义的RecordWriter返回

LogRecordWriter logRecordWriter = new LogRecordWriter(job);

return logRecordWriter;

}

}

//编写LogRecordWriter类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter {

private FSDataOutputStream atguiguOut;

private FSDataOutputStream otherOut;

public LogRecordWriter(TaskAttemptContext job) {

try {

//获取文件系统对象

FileSystem fs = FileSystem.get(job.getConfiguration());

//用文件系统对象创建两个输出流对应不同的目录

atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));

otherOut = fs.create(new Path("d:/hadoop/other.log"));

} catch (IOException e) {

e.printStackTrace();

}

}

@Override

public void write(Text key, NullWritable value) throws IOException, InterruptedException {

String log = key.toString();

//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容

if (log.contains("atguigu")) {

atguiguOut.writeBytes(log + "\n");

} else {

otherOut.writeBytes(log + "\n");

}

}

@Override

public void close(TaskAttemptContext context) throws IOException, InterruptedException {

//关流

IOUtils.closeStream(atguiguOut);

IOUtils.closeStream(otherOut);

}

}

//编写LogDriver类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(LogDriver.class);

job.setMapperClass(LogMapper.class);

job.setReducerClass(LogReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

//设置自定义的outputformat

job.setOutputFormatClass(LogOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path("D:\\input"));

//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat

//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录

FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);

}

}

MapTask工作机制总结

(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。

(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

ReduceTask工作机制

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。

ReduceTask并行度决定机制

设置ReduceTask并行度(个数)

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4

job.setNumReduceTasks(4);

(1) ReduceTask=0, 表示没有Reduce阶段,输出文件个数和Map个数一致。 (2) ReduceTask默认值就是1, 所以输出文件个数为一个。 (3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾余斜。 (4) ReduceTask数量并不是任意设置, 还要考虑业务逻辑需求,有些情况下,需要计算全 局汇总结果,就只能有1个ReduceTask。 (5)如果分区数不是1,但是ReduceTask为1, 也不执行分区过程。因为在MapTask的源码中 ,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。

Map Join

解决在Reduce端处理过多的表,容易产生的数据倾斜。

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3)具体办法:采用DistributedCache

(1)在Mapper的setup阶段,将文件读取到缓存集合中。

(2)在Driver驱动类中加载缓存。

案例实操:将产品信息配对输出

package com.wyc.mapreduce.mapjoin;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.HashMap;

public class MapJoinMapper extends Mapper {

private HashMap pdMap = new HashMap<>();

private Text outK =new Text();

@Override

protected void setup(Mapper.Context context) throws IOException, InterruptedException {

//获取缓存文件,并把文件内容封装到集合pd。txt

URI[] cacheFiles = context.getCacheFiles();

FileSystem fs = FileSystem.get(context.getConfiguration());

FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));

//从流中读取数据

BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

String line;

while (StringUtils.isNotEmpty(line=reader.readLine())){

//切割

String[] fields = line.split("\t");

//赋值

pdMap.put(fields[0],fields[1]);

}

//关流

IOUtils.closeStream(reader);

}

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

//处理order.txt

String line = value.toString();

String[] fields = line.split("\t");

//获取pid

String pname = pdMap.get(fields[1]);

//获取订单id 和订单数量

//封装

outK.set(fields[0]+"\t"+pname+"\t"+fields[2]);

context.write(outK,NullWritable.get());

}

}

数据清洗(ETL)

“ETL,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库,在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

案例实操:去除日志中字段个数小于等于11的日志。

package com.wyc.mapreduce.ETL;

import com.wyc.mapreduce.outputFormat.LogDriver;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WebLogDriver {

public static void main(String[] args) throws Exception {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置

args = new String[] { "D:/11_input/inputlog", "D:/hadoop text/output9090" };

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(LogDriver.class);

// 3 关联map

job.setMapperClass(WebLogMapper.class);

// 4 设置最终输出类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

// 设置reducetask个数为0 取消reduce阶段

job.setNumReduceTasks(0);

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);

}

}

package com.wyc.mapreduce.ETL;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WebLogMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {

//1、获取一行

String line = value.toString();

//3、Etl清洗

boolean result = paresLog(line, context);

if (!result) {

return;

}

//4、符号要求的取出ju

context.write(value, NullWritable.get());

}

private boolean paresLog(String line, Context context) {

//切割

String[] fields = line.split(" ");

//判断日志长度是否大于11

if(fields.length > 11){

return true;

}else {

return false;

}

}

}

Hadoop数据压缩

压缩的优点:以减少磁盘IO、减少磁盘存储空间。

压缩的缺点:增加CPU开销。

压缩原则

(1)运算密集型的Job,少用压缩

(2)IO密集型的Job,多用压缩

 压缩方式选择

压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。

 Gzip压缩

优点:压缩率比较高; 

缺点:不支持Split;压缩/解压速度一般;

Bzip2压缩

优点:压缩率高;支持Split; 

缺点:压缩/解压速度慢。

Lzo压缩

优点:压缩/解压速度比较快;支持Split;

缺点:压缩率一般;想支持切片需要额外创建索引。

 Snappy压缩

优点:压缩和解压缩速度快; 

缺点:不支持Split;压缩率一般; 

压缩位置选择

压缩可以在MapReduce作用的任意阶段启用。

在驱动中加入代码

// 开启map端输出压缩

conf.setBoolean("mapreduce.map.output.compress", true);

// 设置map端输出压缩方式

conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);

-------------------------------------------------------------------------------------------

// 设置reduce端输出压缩开启

FileOutputFormat.setCompressOutput(job, true);

// 设置压缩的方式

FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);

FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: