目录
切片与MapTask并行度决定机制
序列化
序列化案例实操
Shuffle机制
Partition分区
WritableComparable排序
WritableComparable排序案例实操(全排序)
Combiner合并
自定义OutputFormat案例实操
自定义OutputFormat案例实操
MapTask工作机制总结
ReduceTask工作机制
ReduceTask并行度决定机制
Map Join
数据清洗(ETL)
Hadoop数据压缩
压缩方式选择
切片与MapTask并行度决定机制
MapTask并行度决定机制: 数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。 数据切片与MapTask并行度决定机制: 1) 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定 2)每一个Split切片分配一个MapTask并行实例处理 3)默认情况下,切片大小=BlockSize 4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
4)Hadoop序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互
序列化案例实操
统计每一个手机号耗费的总上行流量、总下行流量、总流量
7 13560436666 120.196.100.99 1116 954 200 (例)
id 手机号码 网络ip 上行流量 下行流量 网络状态码
13560436666 1116 954 2070
手机号码 上行流量 下行流量 总流量
package com.wyc.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
1、定义类实现writable接口
2、重写序列化和反序列化方法
3、重写一个空参构造
4、重写Tostrin方法
*/
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
//空参构造
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow+this.downFlow;
}
//序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow=in.readLong();
this.downFlow=in.readLong();
this.sumFlow=in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow ;
}
}
package com.wyc.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper
private Text outk = new Text();
private FlowBean outV =new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、获取一行
//13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String line = value.toString();
//2、切割
String[] split = line.split("\t");
//{13736230513,192.196.100.1,www.atguigu.com,2481,24681,200}放到数组里面
//3、抓取想要的数据
//13736230513,2481,24681 抓取手机号上行流量和下行流量
String phone =split[1];
String up =split[split.length-3];
String down =split[split.length-2];
//4、封装 outK outV
outk.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
//5、写出
context.write(outk,outV);
}
}
package com.wyc.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowRducer extends Reducer
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable
//1、遍历集合,累加值
long totalup = 0;
long totaldown =0;
for (FlowBean value : values) {
totalup += value.getUpFlow();
totaldown += value.getDownFlow();
}
//封装outK,outV
outV.setUpFlow(totalup);
outV.setDownFlow(totaldown);
outV.setSumFlow();
//3、写出
context.write(key,outV);
}
}
package com.wyc.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar包
job.setJarByClass(FlowDriver.class);
//3、关联mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowRducer.class);
//4、设置mapper 输出key和value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5、设置最终数据输出的key和value 类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//6、设置数据的输入和输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\11_input\\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop text\\output2"));
//7、提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
主要包括分区和排序
Partition分区
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机 归属地不同省份输出到不同文件中(分区)
Partition分区案例实操
根据上述代码:期望输出数据为:手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
//text 手机号
String phone = text.toString();
String perphnoe = phone.substring(0, 3);
int partition ;
if("136".equals(perphnoe)){
partition=0;
}else if ("137".equals(perphnoe)){
partition=1;
}else if ("138".equals(perphnoe)){
partition=2;
}else if ("139".equals(perphnoe)){
partition=3;
}else {
partition=4;
}
return partition;
}
}
package com.wyc.mapreduce.partitioner2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar包
job.setJarByClass(FlowDriver.class);
//3、关联mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowRducer.class);
//4、设置mapper 输出key和value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5、设置最终数据输出的key和value 类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
//6、设置数据的输入和输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\11_input\\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop text\\output7"));
//7、提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
分区总结: (1)如果ReduceTask的数量>getPautition的结果数, 则会多产生几个空的输出文件paut -1-000xx; . (2)如果ReduceTask的数量 WritableComparable排序 排序是MapReduce框架中最重要的操作之一。 MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一-次快速排序,并将这些有序数据溢写到磁盘.上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值, 则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成个更一大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次归并排序。 自定义排序WritableComparable原理分析 bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。 排序分类 (1)部分排序 MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。 (2)全排序. 最终输出结果只有一个文件,文件内部有序。实现方式是只设置一个ReduceTask。 但该方法在 处理大型文件时效率极低,因为- -台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。 (3)辅助排序: (Gr oupingComparator分组) 在Reduce端对key进行分组。应用于:在接收的key为bean对象时, 想让一个或几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时, 可以采用分组排序。 (4)二次排序 在自定义排序过程中,如果compare To中的判断条件为两个即为二次排序。 WritableComparable排序案例实操(全排序) 根据案例序列化案例产生的结果再次对总流量进行倒序排序。 Hadoop默认对key排序,将上一个案例的输出作为reduce输入,将手机号作为value,总流量等数据作为key进行排序(及bean),FlowBean实现WritableCompara接口重写compareTo方法。Reducer类循环输出,避免流量相同的情况。 //Flowbean添加代码 @Override public int compareTo(FlowBean o) { //按照总流量比较,倒序排列 if(this.sumFlow > o.sumFlow){ return -1;//倒序排 }else if(this.sumFlow < o.sumFlow){ return 1; }else { //按照上行流量的正序排 --二次排序 if (this.upFlow>o.upFlow){ return 1; }else if(this.upFlow return -1; }else { return 0; } } } } package com.wyc.mapreduce.writableComparable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper private FlowBean outK = new FlowBean(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 获取一行数据 String line = value.toString(); //2 按照"\t",切割数据 String[] split = line.split("\t"); //3 封装outK outV outK.setUpFlow(Long.parseLong(split[1])); outK.setDownFlow(Long.parseLong(split[2])); outK.setSumFlow(); outV.set(split[0]); //4 写出outK outV context.write(outK,outV); } } package com.wyc.mapreduce.writableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer @Override protected void reduce(FlowBean key, Iterable //遍历values集合,循环写出,避免总流量相同的情况 for (Text value : values) { //调换KV位置,反向写出 context.write(value,key); } } } package com.wyc.mapreduce.writableComparable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2 关联本Driver类 job.setJarByClass(FlowDriver.class); //3 关联Mapper和Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4 设置Map端输出数据的KV类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); //5 设置程序最终输出的KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("D:\\hadoop text\\output2")); FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop text\\output4")); //7 提交Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } } WritableComparable排序案例实操(区内排序) 要求按照手机号分区后还要进行排序 增加自定义分区类 package com.wyc.mapreduce.partitionerandwritableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner2 extends Partitioner @Override public int getPartition(FlowBean flowBean, Text text, int numPartitions) { String phone = text.toString(); String prePhone = phone.substring(0,3); int partition; if("136".equals(prePhone)){ partition=0; }else if ("137".equals(prePhone)){ partition=1; }else if ("138".equals(prePhone)){ partition=2; }else if ("139".equals(prePhone)){ partition=3; }else { partition=4; } return partition; } } (2)在驱动类中添加分区类 // 设置自定义分区器 job.setPartitionerClass(ProvincePartitioner2.class); // 设置对应的ReduceTask的个数 job.setNumReduceTasks(5); Combiner合并 统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。 (1)增加一个WordCountCombiner类继承Reducer package com.wyc.mapreduce.combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountCombiner extends Reducer private IntWritable outV =new IntWritable(); @Override protected void reduce(Text key, Iterable int sum =0; for (IntWritable value : values) { sum+=value.get(); } //封装sum写出 outV.set(sum); context.write(key,outV); } } (2)在WordcountDriver驱动类中指定Combiner // 指定需要使用combiner,用reducer作为combiner的逻辑 job.setCombinerClass(WordCountReducer.class); 自定义OutputFormat案例实操 package com.wyc.mapreduce.outputFormat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //输入http://www.baidu.com //http://www.google.com //(输出http://www.baidu.com,NUllWritable); //map阶段不做任何处理只输出 context.write(value,NullWritable.get()); } package com.wyc.mapreduce.outputFormat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer @Override protected void reduce(Text key, Iterable //http://www.baidu.com //http://www.google.com //防止有相同的数据()因为是一个key没有输出 for (NullWritable value : values) { context.write(key,NullWritable.get()); } } } package com.wyc.mapreduce.outputFormat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogOutputFormat extends FileOutputFormat @Override public RecordWriter //创建一个自定义的RecordWriter返回 //要重写getRecordWriter里面的方法,没有 getRecordWriter,就自己创建,再继承 RecordWriter LogRecordWriter lrw = new LogRecordWriter(job); return lrw; } } package com.wyc.mapreduce.outputFormat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class LogRecordWriter extends RecordWriter private FSDataOutputStream atguiguOut; private FSDataOutputStream otherOut; public LogRecordWriter(TaskAttemptContext job) { //创建两条流 try { FileSystem fs = FileSystem.get(job.getConfiguration());//要关联job,要用job自己的配置信息, atguiguOut = fs.create(new Path("D:\\hadoop text\\atguigu.log")); otherOut = fs.create(new Path("D:\\hadoop text\\other.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { //具体写 String log = key.toString(); if(log.contains("atguigu")){ atguiguOut.writeBytes(log+"\n"); }else { otherOut.writeBytes(log+"\n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //关流 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); } } package com.wyc.mapreduce.outputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置自定义的outputformat job.setOutputFormatClass(LogOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("D:\\11_input\\inputoutputformat")); //虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat //而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop text\\output999")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } } 自定义OutputFormat案例实操 过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。 //编写编写LogMapper类 package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //不做任何处理,直接写出一行log数据 context.write(value,NullWritable.get()); } } //编写LogReducer类 package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer @Override protected void reduce(Text key, Iterable // 防止有相同的数据,迭代写出 for (NullWritable value : values) { context.write(key,NullWritable.get()); } } } //自定义一个LogOutputFormat类 package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogOutputFormat extends FileOutputFormat @Override public RecordWriter //创建一个自定义的RecordWriter返回 LogRecordWriter logRecordWriter = new LogRecordWriter(job); return logRecordWriter; } } //编写LogRecordWriter类 package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class LogRecordWriter extends RecordWriter private FSDataOutputStream atguiguOut; private FSDataOutputStream otherOut; public LogRecordWriter(TaskAttemptContext job) { try { //获取文件系统对象 FileSystem fs = FileSystem.get(job.getConfiguration()); //用文件系统对象创建两个输出流对应不同的目录 atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log")); otherOut = fs.create(new Path("d:/hadoop/other.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String log = key.toString(); //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容 if (log.contains("atguigu")) { atguiguOut.writeBytes(log + "\n"); } else { otherOut.writeBytes(log + "\n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //关流 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); } } //编写LogDriver类 package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置自定义的outputformat job.setOutputFormatClass(LogOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("D:\\input")); //虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat //而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } } MapTask工作机制总结 (1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。 (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。 (4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。 (5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。 ReduceTask工作机制 (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。 (2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。 (3)Reduce阶段:reduce()函数将计算结果写到HDFS上。 ReduceTask并行度决定机制 设置ReduceTask并行度(个数) ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置: // 默认值是1,手动设置为4 job.setNumReduceTasks(4); (1) ReduceTask=0, 表示没有Reduce阶段,输出文件个数和Map个数一致。 (2) ReduceTask默认值就是1, 所以输出文件个数为一个。 (3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾余斜。 (4) ReduceTask数量并不是任意设置, 还要考虑业务逻辑需求,有些情况下,需要计算全 局汇总结果,就只能有1个ReduceTask。 (5)如果分区数不是1,但是ReduceTask为1, 也不执行分区过程。因为在MapTask的源码中 ,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。 Map Join 解决在Reduce端处理过多的表,容易产生的数据倾斜。 在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。 3)具体办法:采用DistributedCache (1)在Mapper的setup阶段,将文件读取到缓存集合中。 (2)在Driver驱动类中加载缓存。 案例实操:将产品信息配对输出 package com.wyc.mapreduce.mapjoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; public class MapJoinMapper extends Mapper private HashMap private Text outK =new Text(); @Override protected void setup(Mapper //获取缓存文件,并把文件内容封装到集合pd。txt URI[] cacheFiles = context.getCacheFiles(); FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0])); //从流中读取数据 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); String line; while (StringUtils.isNotEmpty(line=reader.readLine())){ //切割 String[] fields = line.split("\t"); //赋值 pdMap.put(fields[0],fields[1]); } //关流 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //处理order.txt String line = value.toString(); String[] fields = line.split("\t"); //获取pid String pname = pdMap.get(fields[1]); //获取订单id 和订单数量 //封装 outK.set(fields[0]+"\t"+pname+"\t"+fields[2]); context.write(outK,NullWritable.get()); } } 数据清洗(ETL) “ETL,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库,在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。 案例实操:去除日志中字段个数小于等于11的日志。 package com.wyc.mapreduce.ETL; import com.wyc.mapreduce.outputFormat.LogDriver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WebLogDriver { public static void main(String[] args) throws Exception { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "D:/11_input/inputlog", "D:/hadoop text/output9090" }; // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(LogDriver.class); // 3 关联map job.setMapperClass(WebLogMapper.class); // 4 设置最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置reducetask个数为0 取消reduce阶段 job.setNumReduceTasks(0); // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } } package com.wyc.mapreduce.ETL; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WebLogMapper extends Mapper @Override protected void map(LongWritable key, Text value, Mapper //1、获取一行 String line = value.toString(); //3、Etl清洗 boolean result = paresLog(line, context); if (!result) { return; } //4、符号要求的取出ju context.write(value, NullWritable.get()); } private boolean paresLog(String line, Context context) { //切割 String[] fields = line.split(" "); //判断日志长度是否大于11 if(fields.length > 11){ return true; }else { return false; } } } Hadoop数据压缩 压缩的优点:以减少磁盘IO、减少磁盘存储空间。 压缩的缺点:增加CPU开销。 压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 压缩方式选择 压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。 Gzip压缩 优点:压缩率比较高; 缺点:不支持Split;压缩/解压速度一般; Bzip2压缩 优点:压缩率高;支持Split; 缺点:压缩/解压速度慢。 Lzo压缩 优点:压缩/解压速度比较快;支持Split; 缺点:压缩率一般;想支持切片需要额外创建索引。 Snappy压缩 优点:压缩和解压缩速度快; 缺点:不支持Split;压缩率一般; 压缩位置选择 压缩可以在MapReduce作用的任意阶段启用。 在驱动中加入代码 // 开启map端输出压缩 conf.setBoolean("mapreduce.map.output.compress", true); // 设置map端输出压缩方式 conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class); ------------------------------------------------------------------------------------------- // 设置reduce端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 推荐阅读
发表评论