Hadoop MapReduce(五)
初识MapReduce
输入给map的时候每条数据加上行号构成key,value对,经过map的处理转换为另一种k,v对,然后shuffle,使reduce更容易处理
编写Mapper类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
//缺失
private static final int MISSING = 9999;
//hadoop给输入的编号的类型就是LongWritable类型
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//转换Text类型到String类型
String line = value.toString();
//提取年份数据
String year = line.substring(15,19);
//气温变量
int airTemperature;
//判断符号
if(line.charAt(87) == '+'){
airTemperature = Integer.parseInt(line.substring(88,92));
}else {
airTemperature = Integer.parseInt(line.substring(87,92));
}
String quality = line.substring(92,93);
//将有效数据写入到map的context中,注意类型务必要和泛型声明一致
if(airTemperature!=MISSING && quality.matches("[01459]")){
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
编写Reducer类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//reduce的输入与map的输出一样
public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context contex)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for(IntWritable value : values){
maxValue = Math.max(maxValue, value.get());
}
contex.write(key, new IntWritable(maxValue));
}
}
编写App类
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* App入口
* @author Matrix42
*
*/
public class MaxTemperature {
public static void main(String[] args) throws Exception {
//mapred为旧版本API
//每一次map reduce的过程就是一个作业
//job 作业
//task 任务
// map task
// reduce task
Job job = Job.getInstance();
job.setJarByClass(MaxTemperature.class);
//设置作业名称便于调试
job.setJobName("Max temperature");
//添加输入路径,可以添加多个路径(add和set不同)
//输入路径不仅可以是具体文件,还可以是文件夹(目录,不会递归,自动解压)
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置输出路径,输出只有一个,而且不能存在
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置mapper类
job.setMapperClass(MaxTemperatureMapper.class);
//设置reducer类
job.setReducerClass(MaxTemperatureReducer.class);
//设置输出的key类型
job.setOutputKeyClass(Text.class);
//设置输出的value类型
job.setOutputValueClass(IntWritable.class);
//等待作业完成
boolean status = job.waitForCompletion(true);
System.out.println(status);
}
}
打包运行
选择主类后到处jar包
使用hadoop jar 包jar 输入路径 输出路径(不要自己创建)
或
set hadoop_classpath=jar路径(win)
export hadoop_classpath=jar路径(Linux)
hadoop 主类名 输入路径 输出路径(不要自己创建)
如果使用本地模式路径前加file:///
集群模式需要把文件放到HDFS上,jar不用放到HDFS上
可以指定-Xmx1000m
如果磁盘空间不足会出现log-dirs are bad ...
如果在Eclipse上调试程序,需要指定HADOOP_HOME
修改副本数量和块大小
-
所有默认配置
默认配置在:hadoop/share/hadoop/xxx/xxx-xxx.jar里的xxx-default.xml
-
修改配置
块大小:dfs.blocksize(可以使用k,M,G等)
副本数:dfs.replicattion,不能小于最小副本数
最小副本数:dfs.namenode.replication.min
删除结点临时目录
格式化文件系统
start-dfs.sh
start-yarn.sh
Web端口
-
HDFS系列(500开头)
-
namenode 50070
-
datanode 50075
-
2nd namenode 50090
-
-
MapReduce(yarn)
-
resourcemanager 8088
-
nodemanager 8042
-
setup和cleanup
import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
//主机地址
private InetAddress addr;
private String hostName;
private String ip;
private Long startTime;
private Long endTime;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//缺失
private static final int MISSING = 9999;
//hadoop给输入的编号的类型就是LongWritable类型
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//转换Text类型到String类型
String line = value.toString();
//提取年份数据
String year = line.substring(15,19);
//气温变量
int airTemperature;
//判断符号
if(line.charAt(87) == '+'){
airTemperature = Integer.parseInt(line.substring(88,92));
}else {
airTemperature = Integer.parseInt(line.substring(87,92));
}
String quality = line.substring(92,93);
//将有效数据写入到map的context中,注意类型务必要和泛型声明一致
if(airTemperature!=MISSING && quality.matches("[01459]")){
context.write(new Text(year), new IntWritable(airTemperature));
}
}
@Override
protected void cleanup(
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
endTime = System.currentTimeMillis();
System.out.println("任务执行时间为:"+sdf.format(endTime-startTime));
}
@Override
public void run(
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.run(context);
}
@Override
protected void setup(
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//获得输入切割数据
InputSplit split = context.getInputSplit();
//取得切割的长度
long length = split.getLength();
System.out.println("slipt number:"+length);
//得到本地地址
addr = InetAddress.getLocalHost();
//得到ip地址
ip = addr.getHostAddress();
//得到主机名
hostName = addr.getHostName();
startTime = System.currentTimeMillis();
//输出主机信息
System.out.println(hostName+ ":" + ip);
}
}
split研究
map()的并发数取决于split块的数量,gz文件不能split
实际的任务数可能少于split出的块数,map会把任务合并
没有存放数据节点也可能会执行任务,如果数据所在节点资源满了,会把任务分配到其他空闲节点(充分利用资源)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* App入口
* @author Matrix42
*
*/
public class MaxTemperature {
public static void main(String[] args) throws Exception {
//mapred为旧版本API
//每一次map reduce的过程就是一个作业
//job 作业
//task 任务
// map task
// reduce task
Job job = Job.getInstance();
//获得配置信息
Configuration conf = job.getConfiguration();
//配置信息常量存在FileInputFormat类中
//org.apache.hadoop.mapreduce为新类
//org.apache.hadoop.mapred为旧类
//设置最大切割尺寸
conf.setLong(FileInputFormat.SPLIT_MAXSIZE, 1024 * 15);
//如果块最大尺寸为20k,最小为10k,split设置为50k
//会按照最大块数对待,如果小于最小块尺寸
//会按照split大小切割
job.setJarByClass(MaxTemperature.class);
//设置作业名称便于调试
job.setJobName("Max temperature");
//添加输入路径,可以添加多个路径(add和set不同)
//输入路径不仅可以是具体文件,还可以是文件夹(目录,不会递归,自动解压)
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置输出路径,输出只有一个,而且不能存在
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置mapper类
job.setMapperClass(MaxTemperatureMapper.class);
//设置reducer类
job.setReducerClass(MaxTemperatureReducer.class);
//设置输出的key类型
job.setOutputKeyClass(Text.class);
//设置输出的value类型
job.setOutputValueClass(IntWritable.class);
//等待作业完成
boolean status = job.waitForCompletion(true);
System.out.println(status);
}
}