一、程序调优
### --- 预合并CombineMapper

package com.yanqi.mr.comment.step3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CombineMapper extends Mapper<LongWritable, Text, NullWritable,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //只需输出即可
        context.write(NullWritable.get(), value);
    }
}
### --- CombineDriver

package com.yanqi.mr.comment.step3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CombineDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();

        final Job job = Job.getInstance(conf, "CombineDriver");
//        2. 指定程序jar的本地路径
        job.setJarByClass(CombineDriver.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(CombineMapper.class);
//        job.setReducerClass(MergeReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        //指定使用CombineTextinputformat读取数据
        job.setInputFormatClass(CombineTextInputFormat.class);
        //指定分片大小
        CombineTextInputFormat.setMaxInputSplitSize(job, 1024 * 1024 * 4); //4M

        FileInputFormat.setInputPaths(job, new Path("E:\\merge\\input")); //指定读取数据的原始路径
//        7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path("E:\\merge\\merge-out")); //指定结果数据输出路径
        job.setNumReduceTasks(3);
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}
二、编译打印
### --- 编译打印

~~~     配置输入输出参数
### --- 输出压缩MergeDriver

package com.yanqi.mr.comment.step2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;

public class MergeDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();

        final Job job = Job.getInstance(conf, "MergeDriver");
//        2. 指定程序jar的本地路径
        job.setJarByClass(MergeDriver.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(MergeMapper.class);
//        job.setReducerClass(MergeReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //设置使用自定义InputFormat读取数据
        job.setInputFormatClass(MergeInputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("E:\\merge\\merge-out")); //指定读取数据的原始路径
        //指定输出使用的outputformat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间
        //针对Sequencefile的压缩
        SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
        //压缩类型:record压缩
        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
//        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
//        7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, new Path("E:\\merge\\merge-output")); //指定结果数据输出路径
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}

 
 
 
 
 
 
 
 
 

Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm’d both hands before the fire of life.It sinks, and I am ready to depart
                                                                                                                                                   ——W.S.Landor

 
版权声明:本文为yanqi_vip原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/yanqivip/p/16112058.html