`
风过无声
  • 浏览: 87737 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Hadoop MapReduce应用开发

阅读更多

1.开发流程

1)编写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期

2)写一个驱动程序来运行作业

3)通过在一个小的数据集上运行这个驱动程序来进行测试

2.配置API

1)Configuration

一个Configuration类的实例代表配置属性及其取值的一个集合。

每个属性由一个String来命名,而值类型可以是多种。

Configuration从XML文件中读取属性内容,常见的有core-site.xml,hdfs-site.xml,mapred-site.xml。e.g.

configuration-1.xml

<?xml version="1.0"?>
<configuration>
    <property>
        <name>color</name>
        <value>yellow</value>
        <description>Color</description>
    </property>
    <property>
        <name>size</name>
        <value>10</value>
        <description>Size</description>
    </property>
    <property>
        <name>sizew</name>
        <value>10w</value>
        <description>Size</description>
    </property>
    <property>
        <name>weight</name>
        <value>heavy</value>
        <final>true</final>
        <description>Weight</description>
    </property>
    <property>
        <name>size-weight</name>
        <value>${size},${weight}</value>
        <description>Size and weight</description>
    </property>
</configuration>

访问

public void testLoad() {
	Configuration conf = new Configuration();
	conf.addResource("configuration-1.xml");
	assertEquals(conf.get("color"), "yellow");
	assertEquals(conf.getInt("size", 0), 10);
	assertEquals(conf.getInt("sizew", 0), 0);
	assertEquals(conf.get("weight"), "heavy");
	assertEquals(conf.get("size-weight"), "10,heavy");
}

--属性的类型通过访问时的方法确定

--属性可以通过其他属性进行扩展,如size-weight

合并多个配置文件

configuration-2.xml

<?xml version="1.0"?>
<configuration>
  <property>
    <name>size</name>
    <value>12</value>
  </property>
  
  <property>
    <name>weight</name>
    <value>light</value>
  </property>
</configuration>

访问

public void testMerge() {
	Configuration conf = new Configuration();
	conf.addResource("configuration-1.xml");
	conf.addResource("configuration-2.xml");
	assertEquals(conf.getInt("size", 0), 12);
	assertEquals(conf.get("weight"), "heavy");
}

--后来添加到源文件的属性会覆盖之前定义的属性

--被标记为final的属性不能被后面的定义覆盖,并且会WARN提示

2)系统属性

系统属性System.setProperty(...)或者使用JVM参数-DXXX=XXX设置

配置属性可以通过系统属性来扩展,系统属性优先级高于配置文件中定义的属性。前提是配置属性中存在该属性。

public void testMerge() {
	Configuration conf = new Configuration();
	conf.addResource("configuration-1.xml");
	conf.addResource("configuration-2.xml");
	assertEquals(conf.getInt("size", 0), 12);
	assertEquals(conf.get("weight"), "heavy");
	System.setProperty("size", "14");
	assertEquals(conf.get("size-weight"), "14,heavy");
	//fail
	assertEquals(conf.getInt("size", 0), 14); 
}

3.辅助类GenericOptionsParser,Tool和ToolRunner

GenericOptionsParser一个用来解释常用的Hadoop命令行选项的类,但更方便的方式是实现Tool接口,通过ToolRunner来运行程序,ToolRunner内部调用GenericOptionsParser。e.g.

package com.siyuan.hadoop.test.dev;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ConfigurationPrinter extends Configured implements Tool {
	
	@Override
	public int run(String[] args) throws Exception {
		for (String arg : args) {
			System.out.println(arg);
		}
		return 0;
	}
	
	public static void main(String[] args) throws Exception {
		ConfigurationPrinter cfgPrinter = new ConfigurationPrinter();
		Configuration loaded = new Configuration(false);
		loaded.addResource("configuration-1.xml");
		cfgPrinter.setConf(loaded);
		int exitCode = ToolRunner.run(cfgPrinter , args);
		Configuration cfg = cfgPrinter.getConf();
		for (Map.Entry<String,String> property : cfg) {
			System.out.printf("Property: %s=%s\n", property.getKey(), property.getValue());
		}
		System.exit(exitCode);
	}
	
}

执行:

hadoop jar /home/hadoop/task/hadooptest.jar com.siyuan.hadoop.test.dev.ConfigurationPrinter -conf configuration-2.xml arg1 arg2

输出结果:

arg1
arg2
Property: weight=heavy
Property: sizew=10w
Property: color=yellow
Property: size-weight=${size},${weight}
Property: mapred.used.genericoptionsparser=true
Property: size=12

在程序中ToolRunner的静态run方法使用GenericOptionsParser来获取在hadoop命令行中指定的标准选项,然后在Configuration实例上进行设置,将非标准选项传递给Tool接口的run方法。


1)-D选项和系统属性不一样

2)-D选项的优先级要高于配置文件中的其他属性

3)并不是所有的属性都能通过-D改变

4)选项必须位于程序参数之前,如之前的-conf configuration-2.xml必须位于arg1,arg2,否则将被视为程序参数

4.程序编写

1)Mapper

MaxTemperatureMapper

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {
	
	private static final int MISSING = 9999;  
	
	@Override
	protected void map(LongWritable key, Text value,
			Context context)
			throws IOException, InterruptedException {
		String line = value.toString();  
        
		String year = line.substring(15, 19);  
		  
		int airTemperature;  
		if (line.charAt(87) == '+') {  
		    airTemperature = Integer.parseInt(line.substring(88, 92));  
		} else {  
		    airTemperature = Integer.parseInt(line.substring(87, 92));  
		}  
		  
		String quaility = line.substring(92, 93);  
		if (airTemperature != MISSING && quaility.matches("[01459]")) {  
		    context.write(new Text(year), new IntWritable(airTemperature));  
		}
		
	}
	
}

MaxTemperatureMapperTest

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import junit.framework.TestCase;
import static org.mockito.Mockito.*;

public class MaxTemperatureMapperTest extends TestCase {
	
	private MaxTemperatureMapper mapper;
	private Mapper<LongWritable, Text, Text, IntWritable>.Context ctxt;
	
	@Override
	protected void setUp() throws Exception {
		mapper = new MaxTemperatureMapper();
		ctxt = mock(Mapper.Context.class);
	}

	public void testMap() throws IOException, InterruptedException {
		Text value = new Text("0029029070999991901010106004+64333+023450FM-12+000599999V0"
				+ "202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999");
		mapper.map(null, value, ctxt);
		verify(ctxt).write(new Text("1901"), new IntWritable(-78));
	}
	
	public void testMapMissing() throws IOException, InterruptedException {
		Text value = new Text("0029029070999991901010106004+64333+023450FM-12+000599999V0"
				+ "202701N015919999999N0000001N9+99991+99999102001ADDGF108991999999999999999999");
		mapper.map(null, value, ctxt);
		verify(ctxt, never()).write(any(Text.class), any(IntWritable.class));
	}
	
}

2)Reducer

MaxTemperatureReducer

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context ctxt)
			throws IOException, InterruptedException {
		int maxAirTemperature = Integer.MIN_VALUE;  
        
        for (IntWritable airTemperature : values) {  
            maxAirTemperature = Math.max(maxAirTemperature, airTemperature.get());  
        }  
          
        ctxt.write(new Text(key), new IntWritable(maxAirTemperature));
	}
	
}

MaxTemperatureReducerTest

package com.siyuan.hadoop.test.dev;

import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import junit.framework.TestCase;
import static org.mockito.Mockito.*;

public class MaxTemperatureReducerTest extends TestCase {
	
	private MaxTemperatureReducer reducer;
	private Reducer<Text, IntWritable, Text, IntWritable>.Context ctxt;
	
	@Override
	protected void setUp() throws Exception {
		reducer = new MaxTemperatureReducer();
		ctxt = mock(Reducer.Context.class);
	}
	
	public void testReduce() throws IOException, InterruptedException {
		Text key = new Text("1901");
		List<IntWritable> values = new ArrayList<IntWritable>();
		values.add(new IntWritable(100));
		values.add(new IntWritable(50));
		values.add(new IntWritable(0));
		reducer.reduce(key, values, ctxt);
		verify(ctxt).write(new Text("1901"), new IntWritable(100));
	}
	
}

3)job driver

package com.siyuan.hadoop.test.dev;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxTemperatureJob extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 2) {  
            System.err.println("Usage: MaxTemperatureJob <input path> <output path>");  
            System.exit(-1);  
        }
		
		Job job = new Job(getConf(), "Max Temperature Job");
		job.setJarByClass(getClass());
		
		FileInputFormat.addInputPath(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        
        // default: job.setInputFormatClass(TextInputFormat.class);
        
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MaxTemperatureJob(), args));
	}
	
}

运行结果:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.Text

修复:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

5.MapReduce的WEB页面

可以通过Hadoop提供的Web页面来浏览作业信息,对于跟踪作业进度,查找作业完成后的统计信息和日志非常有用。默认的端口号为50030。

mapred-site.xml

<property>
  <name>mapred.job.tracker.http.address</name>
  <value>0.0.0.0:50030</value>
  <description>
    The job tracker http server address and port the server will listen on.
    If the port is 0 then the server will start on a free port.
  </description>
</property>
1)作业,任务和task attempt ID

 

--作业ID:jobtracker(不是作业)开始的时间和唯一标识此作业的由jobtracker维护的增量计数器。

e.g. job_201403281304_0006

注:计数器从0001开始,达到10000时,不能重新设置。

--任务ID:在初始化时产生,不必是任务执行的顺序。格式为 作业ID_[mr]_000X,mr为任务类型,000X为任务计数器,从0000开始。

e.g. task_201403281304_0006_m_000000,task_201403281304_0006_r_000000

--task attempt ID

由于失败或者推测执行,任务可能会执行多次。为了标识任务执行的不同实例,会通过task attempt ID进行区分。格式为 任务ID_index,index从0开始。task attempt在作业运行时根据需要分配,所以,它们的顺序代表tasktracker产生并运行的先后顺序。

e.g.task_201403281304_0006_m_000000_0,task_201403281304_0006_r_000000_0

注:如果在jobtracker重启并恢复运行作业后,作业被重启,那么task attempt ID中的计数器将从1000开始。

2)WEB页面组成

--jobtracker页面

在作业存储到历史信息页之前,主页上只显示100个作业,作业历史是永久存储的。

mapred-site.xml

<property>
  <name>mapred.jobtracker.completeuserjobs.maximum</name>
  <value>100</value>
  <description>The maximum number of complete jobs per user to keep around 
  before delegating them to the job history.</description>
</property>

 

作业历史的保存路径,系统会保存30天,然后自动删除。

mapred-site.xml

<property>
  <name>hadoop.job.history.location</name>
  <value></value>
  <description> The location where jobtracker history files are stored.
  The value for this key is treated as a URI, meaning that the files 
  can be stored either on HDFS or the local file system.  If no value is 
  set here, the location defaults to the local file system, at 
  file:///${hadoop.log.dir}/history.  If the URI is missing a scheme,
  fs.default.name is used for the file system.
  </description>
</property>
作业输出目录的_logs/history子目录为用户存放第二个备份,该文件不会被系统删除

 

mapred-site.xml

<property>
  <name>hadoop.job.history.user.location</name>
  <value></value>
  <description> User can specify a location to store the history files of 
  a particular job. If nothing is specified, the logs are stored in 
  output directory. The files are stored in "_logs/history/" in the directory.
  User can stop logging by giving the value "none". 
  </description>
</property>

 

--job页面

--task页面

Actions列包括终止task attempt的连接,默认情况下为禁用的。

core-site.xml

<property>
  <name>webinterface.private.actions</name>
  <value>false</value>
  <description> If set to true, the web interfaces of JT and NN may contain 
                actions, such as kill job, delete file, etc., that should 
                not be exposed to public. Enable this option if the interfaces 
                are only reachable by those who have the right authorization.
  </description>
</property>
6.其它

 

1)获取结果

每个reducer会产生一个输出文件到输出目录。

--hadoop fs 命令中的-getmerge,可以得到源模式目录中的所有文件,并在本地系统上将它们合并成一个文件。e.g.

hadoop fs -getmerge output output-local.txt
2)作业调试

--将调试信息打印到标准错误中

--发送一个信息来更新任务的状态信息以提示我们查看错误日志

--创建一个自定义的计数器来统计错误总数

以上信息均可在WEB页面中查看

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {
	
	private static final int MISSING = 9999;  
	
	enum RECORD_FORMAT {
		ERROR
	}
	
	@Override
	protected void map(LongWritable key, Text value,
			Context context)
			throws IOException, InterruptedException {
		String line = value.toString();  
        
		String year = line.substring(15, 19);  
		  
		int airTemperature;  
		if (line.charAt(87) == '+') {  
		    airTemperature = Integer.parseInt(line.substring(88, 92));  
		} else {  
		    airTemperature = Integer.parseInt(line.substring(87, 92));  
		}  
		  
		String quaility = line.substring(92, 93);  
		if (airTemperature != MISSING && quaility.matches("[01459]")) {  
		    context.write(new Text(year), new IntWritable(airTemperature));  
		} else {
			//将调试信息打印到标准错误中
			System.err.println("Wrong format record has been found:" + line);
			//更改任务状态
			context.setStatus("Wrong format record has been found.");
			//创建一个自定义的计数器来统计错误总数
			context.getCounter(RECORD_FORMAT.ERROR).increment(1);
		}
		
	}
	
}

3)Hadoop用户日志

针对不同的用户,hadoop在不同的地方生成日志,如下表:

4)作业调优


5)MapReduce工作流

--JobControl类

--Oozie

  • 大小: 138.4 KB
  • 大小: 24.9 KB
  • 大小: 89.2 KB
  • 大小: 41.6 KB
分享到:
评论

相关推荐

    Hadoop原理与技术MapReduce实验

    (1)熟悉Hadoop开发包 (2)编写MepReduce程序 (3)调试和运行MepReduce程序 (4)完成上课老师演示的内容 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 二、实验内容 1.单词计数实验...

    hadoop基本流程与mapReduce应用开发.pdf

    hadoop基本流程与mapReduce应用开发.pdf

    Hadoop应用开发技术详解(中文版)

    《大数据技术丛书:Hadoop应用开发技术详解》共12章。第1~2章详细地介绍了Hadoop的生态系统、关键技术以及安装和配置;第3章是MapReduce的使用入门,让读者了解整个开发过程;第4~5章详细讲解了分布式文件系统HDFS...

    Windows7 x64+Eclipse+Hadoop 2.5.2搭建MapReduce开发集群相关工具下载

    在Windows7 x64 + Eclipse + Hadoop 2.5.2搭建MapReduce开发环境,下载的文件中包括下载的文件包括:hadoop 2.5.2.tar.gz,hadoop-common-2.2.0-bin-master.zip,hadoop-eclipse-plugin-2.5.2.jar。应用这些软件的...

    Hadoop应用开发技术详解

    《大数据技术丛书:Hadoop应用开发技术详解》共12章。第1~2章详细地介绍了Hadoop的生态系统、关键技术以及安装和配置;第3章是MapReduce的使用入门,让读者了解整个开发过程;第4~5章详细讲解了分布式文件系统HDFS...

    实验项目 MapReduce 编程

    2. 在 Hadoop 集群主节点上搭建 MapReduce 开发环境 Eclipse。 3. 查看 Hadoop 自带的 MR-App 单词计数源代码 WordCount.java,在 Eclipse 项目 MapReduceExample 下建立新包 com.xijing.mapreduce,模仿内置的 ...

    hadoop技术内幕 深入解析mapreduce架构设计与实现原理

    本书适合Hadoop的二次开发人员、应用开发工程师、运维工程师阅读。 hadoop技术内幕 深入解析mapreduce架构设计与实现原理 内容简介: 前 言 第一部分 基础篇 第1章 阅读源代码前的准备/ 2 1.1 准备源代码学习...

    深入云计算:Hadoop应用开发实战详解

    本书由浅入深,全面、系统地介绍...本书的内容主要包括hdfs、mapreduce、hive、hbase、mahout、pig、zookeeper、avro、chukwa等与hadoop相关的子项目,各个知识点都精心设计了大量经典的小案例,实战型强,可操作性强。

    【大数据入门笔记系列】第五节 SpringBoot集成hadoop开发环境(复杂版的WordCount)

    SpringBoot集成hadoop开发环境(复杂版的WordCount)前言环境清单创建SpringBoot项目创建包创建yml添加集群主机名映射hadoop配置文件环境变量HADOOP_HOME编写代码添加hadoop依赖jar包编译项目造数据IDEA远程提交...

    Hadoop专业解决方案-第5章开发可靠的MapReduce应用.docx

    Hadoop专业解决方案-第5章开发可靠的MapReduce应用.docx

    hadoop 1.2.1 api 最新chm 伪中文版

    Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 一个Map/Reduce 作业(job) 通常会把输入的...

    UDA:Hadoop MapReduce 的非结构化数据加速器 (RDMA)

    UDA Hadoop MapReduce 的非结构化数据加速器 (RDMA) UDA 是一种软件插件,可加速 Hadoop 网络并提高执行数据分析密集型应用程序的 Hadoop 集群的扩展性。 一种新颖的数据移动协议将 RDMA 与高效的归并排序算法结合...

    Hadoop应用开发技术详解书籍

    《Hadoop应用开发技术详解》一书由资深Hadoop技术专家撰写,系统、全面、深入地讲解了Hadoop开发者需要掌握的技术和知识,包括HDFS的原理和应用、Hadoop文件I/O的原理和应用、MapReduce的原理和高级应用、MapReduce...

    hadoop-3.3.4 版本(最新版)

    够让用户轻松地在 Hadoop 上开发和运行处理海量数据的应用程序。 Hadoop 架构有两个主要的组件:分布式文件系统 HDFS 和 MapReduce 引擎。 在 Hadoop 中,MapReduce 底层的分布式文件系统是独文模块,用户可按照约定...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 13.深入MapReduce应用开发(共21页).pptx

    深入MapReduce应用开发(共21页).pptx 大数据与云计算教程课件 优质大数据课程 14.Hadoop集群配置(共6页).pptx 大数据与云计算教程课件 优质大数据课程 15.Hive(共46页).pptx 大数据与云计算教程课件 优质...

    Hadoop应用开发技术详解.part1.rar

    资深Hadoop技术专家撰写,从开发者角度对Hadoop分布式文件系统、Hadoop文件I/O、Hive、HBase、Mahout,以及...Hadoop应用开发技术详解.part1.rar Hadoop应用开发技术详解.part2.rar Hadoop应用开发技术详解.part3.rar

    hadoop技术内幕 深入解析mapreduce架构设计与实现原理.(董西成).全本

    “Hadoop技术内幕”共两册,分别从源代码的角度对“Common+HDFS”和“MapReduce的架构设计和实现原理”进行了极为详细的分析。...本书适合Hadoop的二次开发人员、应用开发工程师、运维工程师阅读。

    Hadoop大数据开发实战-教学大纲.pdf

    二、 课程的任务 通过本课程的学习,使学生学会搭建Hadoop完全分布式集群,掌握HDFS的原理和基础操作,掌握MapReduce原理架构、MapReduce程序的编写。为将来从事大数据挖掘研究工作以及后续课程的学习奠定基础。

    hadoop技术内幕 深入解析mapreduce架构设计与实现原理.(董西成).全本1

    “Hadoop技术内幕”共两册,分别从源代码的角度对“Common+HDFS”和“MapReduce的架构设计和实现原理”进行了极为详细的分析。...本书适合Hadoop的二次开发人员、应用开发工程师、运维工程师阅读。

    Hadoop从入门到上手企业开发

    近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 课程目录 000 上课方式和课程大纲介绍 001 Linux系统基本知识说明和启动Linux虚拟机 ...066 Hadoop MapReduce框架数据类型讲解 067

Global site tag (gtag.js) - Google Analytics