初识MapReduce

输入给map的时候每条数据加上行号构成key,value对,经过map的处理转换为另一种k,v对,然后shuffle,使reduce更容易处理

mr.png

编写Mapper类

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    //缺失
    private static final int MISSING = 9999;

    //hadoop给输入的编号的类型就是LongWritable类型
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

        //转换Text类型到String类型
        String line = value.toString();

        //提取年份数据
        String year = line.substring(15,19);

        //气温变量
        int airTemperature;

        //判断符号
        if(line.charAt(87) == '+'){
            airTemperature = Integer.parseInt(line.substring(88,92));
        }else {
            airTemperature = Integer.parseInt(line.substring(87,92));
        }
        String quality = line.substring(92,93);

        //将有效数据写入到map的context中,注意类型务必要和泛型声明一致
        if(airTemperature!=MISSING && quality.matches("[01459]")){
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }

}

编写Reducer类

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//reduce的输入与map的输出一样
public class MaxTemperatureReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context contex)
            throws IOException, InterruptedException {

        int maxValue = Integer.MIN_VALUE;

        for(IntWritable value : values){
            maxValue = Math.max(maxValue, value.get());
        }
        contex.write(key, new IntWritable(maxValue));

    }

}

编写App类

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * App入口
 * @author Matrix42
 *
 */
public class MaxTemperature {

    public static void main(String[] args) throws Exception {
        //mapred为旧版本API
        //每一次map reduce的过程就是一个作业
        //job 作业
        //task 任务
        //  map task
        //  reduce task
        Job job = Job.getInstance();
        job.setJarByClass(MaxTemperature.class);

        //设置作业名称便于调试
        job.setJobName("Max temperature");

        //添加输入路径,可以添加多个路径(add和set不同)
        //输入路径不仅可以是具体文件,还可以是文件夹(目录,不会递归,自动解压)
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //设置输出路径,输出只有一个,而且不能存在
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //设置mapper类
        job.setMapperClass(MaxTemperatureMapper.class);
        //设置reducer类
        job.setReducerClass(MaxTemperatureReducer.class);

        //设置输出的key类型
        job.setOutputKeyClass(Text.class);
        //设置输出的value类型
        job.setOutputValueClass(IntWritable.class);

        //等待作业完成
        boolean status = job.waitForCompletion(true);

        System.out.println(status);
    }

}

打包运行

ncdc数据下载方法

选择主类后到处jar包

使用hadoop jar 包jar 输入路径 输出路径(不要自己创建)

set hadoop_classpath=jar路径(win)

export hadoop_classpath=jar路径(Linux)

hadoop 主类名 输入路径 输出路径(不要自己创建)

如果使用本地模式路径前加file:///

集群模式需要把文件放到HDFS上,jar不用放到HDFS上

可以指定-Xmx1000m

如果磁盘空间不足会出现log-dirs are bad ...

如果在Eclipse上调试程序,需要指定HADOOP_HOME

修改副本数量和块大小

  • 所有默认配置

    默认配置在:hadoop/share/hadoop/xxx/xxx-xxx.jar里的xxx-default.xml

  • 修改配置

    块大小:dfs.blocksize(可以使用k,M,G等)

    副本数:dfs.replicattion,不能小于最小副本数

    最小副本数:dfs.namenode.replication.min

    删除结点临时目录

    格式化文件系统

    start-dfs.sh

    start-yarn.sh

Web端口

  • HDFS系列(500开头)

    • namenode 50070

    • datanode 50075

    • 2nd namenode 50090

  • MapReduce(yarn)

    • resourcemanager 8088

    • nodemanager 8042

setup和cleanup

import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    //主机地址
    private InetAddress addr;
    private String hostName;
    private String ip;

    private Long startTime;
    private Long endTime;

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //缺失
    private static final int MISSING = 9999;

    //hadoop给输入的编号的类型就是LongWritable类型
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

        //转换Text类型到String类型
        String line = value.toString();

        //提取年份数据
        String year = line.substring(15,19);

        //气温变量
        int airTemperature;

        //判断符号
        if(line.charAt(87) == '+'){
            airTemperature = Integer.parseInt(line.substring(88,92));
        }else {
            airTemperature = Integer.parseInt(line.substring(87,92));
        }
        String quality = line.substring(92,93);

        //将有效数据写入到map的context中,注意类型务必要和泛型声明一致
        if(airTemperature!=MISSING && quality.matches("[01459]")){
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }

    @Override
    protected void cleanup(
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        endTime = System.currentTimeMillis();
        System.out.println("任务执行时间为:"+sdf.format(endTime-startTime));
    }

    @Override
    public void run(
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        super.run(context);

    }

    @Override
    protected void setup(
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        //获得输入切割数据
        InputSplit split = context.getInputSplit();
        //取得切割的长度
        long length = split.getLength();
        System.out.println("slipt number:"+length);
        //得到本地地址
        addr = InetAddress.getLocalHost();
        //得到ip地址
        ip = addr.getHostAddress();
        //得到主机名
        hostName = addr.getHostName();
        startTime = System.currentTimeMillis();
        //输出主机信息
        System.out.println(hostName+ ":" + ip);

    }

}

split研究

map()的并发数取决于split块的数量,gz文件不能split

实际的任务数可能少于split出的块数,map会把任务合并

没有存放数据节点也可能会执行任务,如果数据所在节点资源满了,会把任务分配到其他空闲节点(充分利用资源)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * App入口
 * @author Matrix42
 *
 */
public class MaxTemperature {

    public static void main(String[] args) throws Exception {
        //mapred为旧版本API
        //每一次map reduce的过程就是一个作业
        //job 作业
        //task 任务
        //  map task
        //  reduce task
        Job job = Job.getInstance();
        //获得配置信息
        Configuration conf = job.getConfiguration();
        //配置信息常量存在FileInputFormat类中
        //org.apache.hadoop.mapreduce为新类
        //org.apache.hadoop.mapred为旧类

        //设置最大切割尺寸
        conf.setLong(FileInputFormat.SPLIT_MAXSIZE, 1024 * 15);

        //如果块最大尺寸为20k,最小为10k,split设置为50k
        //会按照最大块数对待,如果小于最小块尺寸
        //会按照split大小切割

        job.setJarByClass(MaxTemperature.class);

        //设置作业名称便于调试
        job.setJobName("Max temperature");

        //添加输入路径,可以添加多个路径(add和set不同)
        //输入路径不仅可以是具体文件,还可以是文件夹(目录,不会递归,自动解压)
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //设置输出路径,输出只有一个,而且不能存在
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //设置mapper类
        job.setMapperClass(MaxTemperatureMapper.class);
        //设置reducer类
        job.setReducerClass(MaxTemperatureReducer.class);

        //设置输出的key类型
        job.setOutputKeyClass(Text.class);
        //设置输出的value类型
        job.setOutputValueClass(IntWritable.class);

        //等待作业完成
        boolean status = job.waitForCompletion(true);

        System.out.println(status);
    }
}