首页>> 正文

【dbdao Hadoop 大数据学习】 Hadoop框架入门

来源:商群邮件营销时间:2016-02-21 16:54:54点击:1809

dbDao.com 引导式IT在线教育

Hadoop 技术学习QQ群号  : 134115150

本文固定链接:http://t.dbdao.com/archives/getting-started-with-the-hadoop-framework.html

Hadoop框架入门

前几章讨论了大数据的动机,接着深入介绍市场上最重要的大数据框架-Hadoop。本章你将实际使用Hadoop,指导你完成设置Hadoop开发环境的过程,并提供一些操作系统上安装Hadoop的操作指南。然后写一个Hadoop程序,并引导你了解Hadoop架构下更深层次的概念。

安装类型

虽然安装Hadoop往往是有经验的系统管理员的任务,并且Hadoop的安装细节可在Apache 网站找到,但对于在各种平台上安装Hadoop有2点需要知道:

  • 要启用Hadoop程序的单元测试,Hadoop需要以独立模式安装。该过程对Linux系统来说相对简单,但对于Windows系统来说更复杂。
  • 为了能够在一个真实集群中启用Hadoop程序模拟,Hadoop提供了一个伪分布式集群的操作模式(t.dbdao.com)。

本章涵盖了使用Hadoop的多种模式。Hadoop开发环境是虚拟机的背景下讨论的。我们以独立模式在Windows和Linux上展示Hadoop安装(还讨论了Linux的伪集群安装)。 Hadoop是一个不断发展的软件,它的安装过程非常复杂。

附录A介绍了Windows和Linux平台的安装步骤。这些步骤须视为一套通用的安装指南。具体情况可能会有所不同。如本章中所述,Hadoop 2.x平台的开发,建议使用VM方法来安装开发环境。

独立模式

单机是最简单的操作模式,且最适合调试。在这种模式下,Hadoop进程在单个JVM中运行。虽然从性能的角度看,这种模式显然是效率最低的,但对开发周期来说却是最有效的。

伪分布式集群

在该模式下,Hadoop以伪分布的方式在单节点上运行,且所有守护进程都在独立的Java进程中运行。该模式用于模拟集群环境(t.dbdao.com)。

多节点集群安装

在该模式下,Hadoop确实安装在一个机器集群上。其安装最为复杂,且往往是经验丰富的Linux系统管理员的任务。从逻辑的角度看,这与伪分布式集群相同。

使用Amazon Elastic MapReduce预安装

另一种让你可以在hadoop集群中,快速上手的方法是Amazon Elastic MapReduce(EMR)。现在该服务不仅支持Hadoop的1.x和2.x版本。它还支持Hadoop的各种发行版本,如Apache版本和MapR发行版。

EMR使用户只需在网页上简单点击几下鼠标就能运行一个Hadoop集群。EMR背后的主要思想如下:

1.用户加载数据到Amazon S3服务,一个简单的存储服务中。Amazon S3是Amazon网络服务所提供的分布式文件存储系统。它通过Web服务接口支持存储。Hadoop可配置为将S3作为一个分布式文件系统。在这种模式下,S3服务的作用类似于HDFS。

2.用户还加载应用程序库到Amazon S3服务。

3.用户通过指示库和输入文件以及任务写入其输出的S3中输出目录的位置,启动EMR任务。

4.Hadoop集群在Amazon云上启动,任务被执行,其输出持续放在前面步骤指定的输出目录中(t.dbdao.com)。

在其默认行为中,集群自动关闭,用户停止付款。但有一个选项(如今网页上有,可启动EMR)可使你表明你想让集群保持活动状态:Auto-terminate选项。当该选项为NO,则任务完成后集群不会关闭。

使用Secure Shell (SSH)客户端你可以选择进入任何节点。用户通过SSH客户端连接到一个物理模式后,可继续使用Hadoop作为一个功能完备的集群。即使HDFS也对用户可用。

用户可使用一个示例和小任务启动集群,从而执行并保持集群运行。用户可通过连接到其中一个节点运行多个任务。一个简单的双节点集群的成本约为每小时$ 1.00(取决于所选择的服务器类型,如果选用高端服务器价格涨幅高达每小时$ 14,00)。用户完成工作后,可以关闭集群,并停止付款。因此,用户花很小的代价就可体验到在一个真实生产级的Hadoop集群上运行实际任务。(第16章讨论了云中的Hadoop)

  • 注意   即使每小时$ 1.00,合起来也可能会超过一个月的时间。在amazon的云产品中要特别注意你运行的服务的状态。当你不使用时关闭它们,并确保你了解被计费的项目。我们知道的至少有一人,在不使用时还让服务器整月运行,而被划走高达数百美元。不要让这种情况发生在你身上。

用Cloudera虚拟机建立一个开发环境

本书主要关注的是Hadoop开发,Hadoop安装是个复杂的任务,通常使用由供应商提供的工具进行简化。例如,Cloudera提供了Cloudera管理器,它简化了Hadoop的安装。作为开发人员,你当然希望有一个可快速安装和设置的可靠的开发环境。Cloudera已发布了CDH 5.0同时支持VMware和VirtualBox。如果你没有安装这些VM,首先下载其最新版本。然后从这个链接下载Cloudera 5 QuickStart VM:

www.cloudera.com/content/support/en/downloads/download-components/download-products.html?productID=F6mO278Rvo

注意:Cloudera 5 VM需要8 GB的内存。确保你的机器有足够的内存来执行VM。或者,按照下一节中的步骤来安装你的开发环境(t.dbdao.com)。

当你启动VM,可以看到如图3-1所示的画面。箭头指向VM内桌面上的Eclipse图标。你只需打开Eclipse,并开始Hadoop代码开发,因为环境被配置为直接从Eclipse环境以本地模式运行任务。

3- 1 Cloudera 5 VM

这就是Hadoop 2.0入门你所需要的全部内容。该环境也使用户能够以伪分布式模式执行任务,以模拟在真实集群上测试。因此,它是一个集开发、单元测试和集成测试为一体的完善的环境。该环境还被配置为允许使用Cloudera管理器,一个用来监控和管理任务的用户友好型GUI工具。我们鼓励你熟悉这一工具,因为它大大简化了作业管理和跟踪任务。

我们强烈推荐这种方式,它能够快速完成Hadoop 2.0开发环境安装(t.dbdao.com)。

  • 注意  如果你打算使用本节提到的Cloudera虚拟机,不需要了解有关Hadoop的安装。不过我们在附录a中已经介绍了Windows和Linux上Hadoop的安装过程,你应该按照附录a中的步骤以伪集群模式安装Hadoop。

MapReduce程序的组件

本节介绍了Java中构成MapReduce程序的各个组件。以下列表描述了这些组件:

  • Client Java program : 集群中从客户端节点(也称为 edge node )启动的Java程序。此节点可以访问Hadoop集群,有时(不总是)也可以是集群中的数据节点之一。它是集群中有权访问Hadoop安装的一台机器。
  • Custom Mapper class : 包括一个Mapper类,往往是一个自定义类。该类的实例在远程任 务节点上执行,在伪集群中执行任务的情况除外。这些节点往往与客户端Java程序启动任务的节点不同。
  • Custom Reducer class : 包括一个Reducer类,往往是一个自定义类。类似于Mapper,该类的实例在远程任务节点上执行,在伪集群中执行任务的情况除外。这些节点往往与客户端Java程序启动任务的节点不同。
  • Client-side libraries : 在运行时执行客户端所需的独立于标准Hadoop库。客户端所需的Hadoop库早已安装,并通过Hadoop Client命令(不同于客户端程序)配置到CLASSPATH。它位于$HADOOP_HOME/bin/文件夹中,名为Hadoop。正如java命令用于执行Java程序,hadoop命令用于执行启动Hadoop任务的客户端程序。这些库通过设置环境变量HADOOP_CLASSPATH进行配置。与CLASSPATH变量类似,它是一个冒号分隔的库列表。
  • Remote libraries :执行自定义Mapper和Reducer类所需的库。他们排除了Hadoop库集,因为Hadoop库已经配置在DataNode上。例如,如果Mapper正使用一个专门的XML解析器,则包含解析器的库必须被传输到执行Mapper的远程DataNode上。
  • Java Application Archive (JAR) files :Java应用程序被打包成JAR文件,其中包含客户端Java类以及自定义Mapper和Reducer类。它还包括客户端和Mapper/Reducer类所使用的其他自定义从属类(t.dbdao.com)。

第一个Hadoop程序

本节中,你会完成第一个Hadoop程序。对于此应用程序你使用的环境是带Maven插件的Eclipse。如果你使用的是Cloudera VM,它会为你预先安装。在附录B中有在Eclipse中创建一个Maven项目的过程记录。

首先创建一个空的Maven项目和必要的附件。该项目对象模型(POM)如表3-1中所示。创建一个名为pom.xml的文件,把表3-1中的代码放进该文件。

3- 1 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>prohadoop</groupId>

<artifactId>prohadoop</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>ProHadoop</name>

<dependencies>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.2.0</version>

</dependency>

</dependencies>

</project>
<projectxmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
<modelVersion>4.0.0</modelVersion>
 
<groupId>prohadoop</groupId>
 
<artifactId>prohadoop</artifactId>
 
<version>0.0.1-SNAPSHOT</version>
 
<name>ProHadoop</name>
 
<dependencies>
 
<dependency>
 
<groupId>org.apache.hadoop</groupId>
 
<artifactId>hadoop-client</artifactId>
 
<version>2.2.0</version>
 
</dependency>
 
</dependencies>
 
</project>

现在你可以开发你的第一个MapReduce程序,并在本地运行。然而,令人困惑的是有两个MapReduce API:旧的API和新的API。旧的API已经被淘汰,但它仍然广泛使用。我们同时使用两个API运行任务,这样你就可以慢慢熟悉两个API(t.dbdao.com)。

  •  你不需要手动输入这些示例列表。在本书的apress.com目录页你会看到一个标签,从中下载包含所有列表的zip压缩包。

在本地模式下运行程序的先决条件

能够在本地模式下运行Hadoop程序是很重要的,这样能够快速的开发,读者可以单元测试程序。唯一的先决条件是,HADOOP_HOME环境变量应正确设置,且{HADOOP_HOME} / bin应包含在PATH变量中。对于Linux环境来说,确保这两个设置是足够的。

然而,在Windows环境中,需要应用整个构建过程以确保所有必需的动态链接库(DLL)都在机器上。它们并不包含在从Apache网站下载的tar文件中。在Windows上将Hadoop安装为伪集群或本地模式的过程在附录A中有描述。

如果你使用的是前面章节中描述的虚拟机,这些先决条件早已在VM中配置了。

假设先决条件都满足,则接下来的示例程序,现在可以像你所选择的集成开发环境(IDE)中的其他任何Java程序一样执行(t.dbdao.com)。

以本地模式运行本书中描述的所有任务的样本数据都被保存在${project.basedir}/src/main/resources/input文件夹中。${project.basedir}目录是该项目的基本目录。运行WordCount应用程序的示例文本文件被保存在前面所提文件夹的wordcount子目录中。

使用旧API统计字数

表3-2中的WordCountOldAPI.java程序展示了使用已被弃用的旧API统计字数的应用程序的代码。但旧API仍然广泛使用,且有大量的遗留代码。旧的API不会消失,而且知道如何使用它非常重要,因为维护代码时,读者可能会经常遇到它。

3- 2   WordCountOldAPI.java

package org.apress.prohadoop.c3;

import java.io.IOException; import java.util.Iterator;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat; 
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCountOldAPI {



public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

output.collect(new Text(value.toString()), new IntWritable(1));

}

}

public static class MyReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}



public static void main(String[] args) throws Exception { 
JobConf conf = new JobConf(WordCountOldAPI.class); conf.setJobName("wordcount");



conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);



conf.setMapperClass(MyMapper.class); 
conf.setCombinerClass(MyReducer.class);
 conf.setReducerClass(MyReducer.class);
 conf.setNumReduceTasks(1); 
conf.setInputFormat(TextInputFormat.class); 
conf.setOutputFormat(TextOutputFormat.class); 
FileInputFormat.setInputPaths(conf, new Path(args[0])); 
FileOutputFormat.setOutputPath(conf, new Path(args[1]));


JobClient.runJob(conf);

}

}
package org.apress.prohadoop.c3;
 
importjava.io.IOException; importjava.util.Iterator;
 
importorg.apache.hadoop.fs.Path;
 
importorg.apache.hadoop.io.IntWritable; 
importorg.apache.hadoop.io.LongWritable; 
importorg.apache.hadoop.io.Text;
 
importorg.apache.hadoop.mapred.FileInputFormat; 
importorg.apache.hadoop.mapred.FileOutputFormat; 
importorg.apache.hadoop.mapred.JobClient;
 
importorg.apache.hadoop.mapred.JobConf; 
importorg.apache.hadoop.mapred.MapReduceBase; 
importorg.apache.hadoop.mapred.Mapper;
 
importorg.apache.hadoop.mapred.OutputCollector; 
importorg.apache.hadoop.mapred.Reducer;
 
importorg.apache.hadoop.mapred.Reporter;
 
importorg.apache.hadoop.mapred.TextInputFormat; 
importorg.apache.hadoop.mapred.TextOutputFormat;
 
public class WordCountOldAPI {
 
 
 
public static class MyMapperextends MapReduceBaseimplements Mapper<LongWritable, Text, Text, IntWritable> {
 
public void map(LongWritablekey, Textvalue,
 
OutputCollector<Text, IntWritable> output, Reporterreporter) throws IOException {
 
output.collect(new Text(value.toString()), new IntWritable(1));
 
}
 
}
 
public static class MyReducerextends MapReduceBaseimplements Reducer<Text, IntWritable, Text, IntWritable> {
 
public void reduce(Textkey, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporterreporter) throws IOException {
 
int sum = 0;
 
while (values.hasNext()) {
 
sum += values.next().get();
 
}
 
output.collect(key, new IntWritable(sum));
 
}
 
}
 
 
 
public static void main(String[] args) throws Exception { 
JobConfconf = new JobConf(WordCountOldAPI.class); conf.setJobName("wordcount");
 
 
 
conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);
 
 
 
conf.setMapperClass(MyMapper.class); 
conf.setCombinerClass(MyReducer.class);
 conf.setReducerClass(MyReducer.class);
 conf.setNumReduceTasks(1); 
conf.setInputFormat(TextInputFormat.class); 
conf.setOutputFormat(TextOutputFormat.class); 
FileInputFormat.setInputPaths(conf, new Path(args[0])); 
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
 
 
JobClient.runJob(conf);
 
}
 
}

首先分析main()方法。需要掌握的最重要的概念如以下列表所述:

  • JobConf是MapReduce任务,它是Hadoop框架的主要接口。框架执行JobConf对象中所描述的任务。
  • TextInputFormat向Hadoop框架声明其输入是文本格式。它是InputFormat的子类。 (第7章会详细讨论各种InputFormat类)到目前为止,知道TextInputFormat类将每一行输入文件作为单个记录读取就足够了。
  • TextOutputFormat指定任务的输出规格。例如,验证输出目录早已不存在。如果输出目录还存在,Hadoop不会允许任务继续进行。Hadoop任务处理大量数据,且往往需要几分钟到几个小时来执行任务,如果任务被意外中断重新运行,则将会损失所有过程。第7章将描述各种类型的专门OutputFormat类,目前知道TextOutputFormat类是用于从MapReduce程序生成文本输出文件就可以了。
  • FileInputFormat.setInputPaths(conf, new Path(args[0]))告诉Hadoop框架选取文件的输入目录。该目录可有一到多个文件,每个文件每行都有一个词。注意在setInputPaths中使用复数形式,这种方法可获取充当Hadoop程序输入路径的一组目录。所有这些路径中的文件形成程序的输入规格。
  • FileOutputFormat.setOutputPath(conf, new Path(args[1]))指定了程序的输出目录。最终的输出放在此目录中(t.dbdao.com)。
  • conf.setOutputKeyClass 和conf.setOutputValueClass指定输出键和值类。他们应该与Reducer类的规格相匹配。这似乎有些多余,并且也确实如此。但它需要与Reducer规格保持一致,否则执行时会出现RuntimeException。
  • Reducer数量的默认值是1。该值可以改变,且通常是为了提高程序的效率。对JobConf.class实例的调用能让你控制setNumReduceTasks(int n)方法。

Now you understand how the entire program works in the Hadoop Framework. When the program begins, the Mapper reads designated blocks from the input files in the input folders. The stream of bytes that represents a file is then converted to a record (Key/Value format) and supplied as input to the Mapper. The key is the byte offset of the current line (instance of LongWritable.class), and the value (instance of Text.class) is a line of text from the file. In this simplified example, using the sample file in the books source code, one line is one word.

The Mapper emits the word and the integer 1. Notice that Hadoop requires you to use its own version of Integer.class called IntWritable.class. The reason lies in the I/O concepts underlying Hadoop (they are discussed in upcoming chapters). For now, accept that the input and output of Hadoop Mappers and Reducers need to be instances of the type Writable.class.

现在你应该明白整个程序是如何在Hadoop框架中工作的。当程序开始时,Mapper从输入文件夹的输入文件中读取指定的块。然后代表一个文件的字节流被转换为一个记录(键/值格式),并作为输入提供给Mapper。键是当前行( LongWritable.class实例)的字节偏移量,值是( Text.class实例)是文件中的一行文本。在该简化示例中,使用本书源代码中的示例文件,一行是一个字。

Mapper输出整数1。注意,Hadoop要求使用它自己的名为IntWritable.class 的Integer.class版本。究其原因在于底层Hadoop 的I / O概念(这些在后面的章节中将会讨论到)。就目前而言,只需知道Hadoop Mappers 和Reducers的输入和输出须为类型Writable.class的实例。

接下来介绍的是Hadoop的Shuffle阶段,它在前面的列表中并不明显。所有的键(也就是我们例子中的字)都是由Shuffle/Sort阶段排序,并提交给Reducer。虽然Reducer看到一个IntWritable.class实例的迭代器,它仅仅是一个逻辑视图。在现实中,同样的IntWritable.class实例正在Reducer中使用。随着Reducer迭代到该键(例子中1s的列表)的值,IntWritable.class的同一实例被重用。在内部,Hadoop框架在对迭代器的values.next()调用上调用了IntWritable.set方法。当你收集值并作为引用重用时注意这一点。由于Hadoop的这个功能(优化以节省内存使用),重用values.next()实例的引用将会导致意想不到的后果。

最后,当Reducer输出值,它们将被写入由FileOutputFormat.setOutputPath(conf, new Path(args[1]))调用在客户端WordCountOldAPI程序指定的输出文件夹。Reducer将其输出键和值实例发送到输出文件。输出文件中的输出键和值实例的默认分隔符是一个TAB,可通过设置配置参数: mapreduce. textoutputformat.separator来修改。例如,用main(…)方法调用conf.set(“mapreduce.textoutputformat.separator”,”,”)会将输出中键/值实例之间的分隔符转化成一个“,”

在真实集群中,具有和配置的Reducer一样多的文件。在该例中,文件包含的输出为一个字,后跟计数(t.dbdao.com)。

集群中执行前述程序的命令行与下一节中使用的命令行类似。这将在下一节探讨新API时讨论。

构建应用程序

接下来,使用Maven构建应用程序。这个过程在附录B中有描述,所以如果你不熟悉Maven,应从那里开始学习。

构建过程产生prohadoop-0.0.1-SNAPSHOT.jar文件。应用程序的各种组件如表3-1中所描述。

3- 1 使用旧 API WordCount 的组件

组件                                     名称

Client 类                             WordCountOldAPI.java

Mapper 类                          WordCountOldAPI.MyMapper.java

Reducer 类                         WordCountOldAPI.MyReducer.java

应用程序 JAR                    prohadoop-0.0.1-SNAPSHOT.jar

Client-side库                     没有对应的程序文件

Remote libraries               没有对应的程序文件

在集群模式下运行WordCount

你可以在集群(或伪集群)模式下启动MapReduce任务。下面是在集群模式下执行时的两个区别(t.dbdao.com):

  • Map和Reduce任务各自在其自己的节点上运行,通常和客户端节点不一样。
  • Map和Reduce任务在自己的JVM中执行。

在集群模式下执行WordCount程序(包括前面章节中的WordCount程序)的第一步是将应用程序类文件打包成JAR文件。该程序然后可执行如下:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:<DEPENDENT_JARS_USED_BY_ CLIENT_CLASS>

hadoop jar prohadoop-0.0.1-SNAPSHOT.jar \ org.aspress.prohadoop.c3.WordCountOldAPI <INPUT_PATH> <OUTPUT_PATH>
exportHADOOP_CLASSPATH=$HADOOP_CLASSPATH:<DEPENDENT_JARS_USED_BY_CLIENT_CLASS>
 
hadoopjarprohadoop-0.0.1-SNAPSHOT.jar \ org.aspress.prohadoop.c3.WordCountOldAPI <INPUT_PATH> <OUTPUT_PATH>

虽然我们没有在客户端程序中使用专门的库,使他们可访问的正确方法是使用以下方法:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:<DEPENDENT_JARS_USED_BY_ CLIENT_CLASS>
exportHADOOP_CLASSPATH=$HADOOP_CLASSPATH:<DEPENDENT_JARS_USED_BY_CLIENT_CLASS>

在执行客户端程序之前,Hadoop命令使用HADOOP_CLASSPATH环境变量来配置client-side CLASSPATH。

hadoop命令还负责发送prohadoop-0.0.1-SNAPSHOT.jar文件到远程节点,为Mapper和Reducer实例在其上的执行做准备,要确保CLASSPATH被配置。这是将应用程序移动到第1章“大数据的动机”中提到的数据的一个例子。

<INPUT_PATH>和 <OUTPUT_PATH>文件夹是指HDFS中的文件夹。前者是文件被Mapper实例使用的输入文件夹。后者是Reducer输出被写入的输出文件夹。

使用新API的WordCount

本节将探讨新API,新API的设计意图是为程序员提供更好的控制。新API用Job.class替换了JobConf.class。它使用户能够配置作业,提交作业,控制其执行,并监测其进展情况。Job.class的set()方法与JobConf.class的set()方法类似,直到任务提交他们才能被调用。如果他们在任务提交后被调用,会抛出一个IllegalStateException实例。Job.class方法如getStatus ()使用户能够监测任务的进度。表3-3展示了使用新API的WordCount程序。注意:main()方法现在使用Job. class 代替了JobConf.class(t.dbdao.com)。

3- 3 WordCountNewAPI.java

package org.apress.prohadoop.c3; import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;



public class WordCountNewAPI {

public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String w = value.toString();

context.write(new Text(w), new IntWritable(1));

}

}


public static class MyReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {


public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { 
int sum = 0;

for (IntWritable val : values) { 
sum += val.get();

}

context.write(key, new IntWritable(sum));

}

}



public static void main(String[] args) throws Exception { 
Job job = Job.getInstance(new Configuration()); 
job.setJarByClass(WordCountNewAPI.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(IntWritable.class); 
job.setMapperClass(MyMapper.class); 
job.setReducerClass(MyReducer.class); 
job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class); 
FileInputFormat.setInputPaths(job, new Path(args[0])); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 
boolean status = job.waitForCompletion(true);

if (status) {

System.exit(0);

} else {

System.exit(1);

}

}

}
package org.apress.prohadoop.c3; importjava.io.IOException;
 
importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path;
 
importorg.apache.hadoop.io.IntWritable;
 
importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.Text;
 
importorg.apache.hadoop.mapreduce.Job; 
importorg.apache.hadoop.mapreduce.Mapper; 
importorg.apache.hadoop.mapreduce.Reducer;
 
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 
 
public class WordCountNewAPI {
 
public static class MyMapperextends Mapper<LongWritable, Text, Text, IntWritable> {
 
public void map(LongWritablekey, Textvalue, Contextcontext) throwsIOException, InterruptedException {
 
String w = value.toString();
 
context.write(new Text(w), new IntWritable(1));
 
}
 
}
 
 
public static class MyReducerextends
 
Reducer<Text, IntWritable, Text, IntWritable> {
 
 
public void reduce(Textkey, Iterable<IntWritable> values,Contextcontext) throwsIOException, InterruptedException { 
int sum = 0;
 
for (IntWritableval : values) { 
sum += val.get();
 
}
 
context.write(key, new IntWritable(sum));
 
}
 
}
 
 
 
public static void main(String[] args) throws Exception { 
Jobjob = Job.getInstance(new Configuration()); 
job.setJarByClass(WordCountNewAPI.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(IntWritable.class); 
job.setMapperClass(MyMapper.class); 
job.setReducerClass(MyReducer.class); 
job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class); 
FileInputFormat.setInputPaths(job, new Path(args[0])); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 
boolean status = job.waitForCompletion(true);
 
if (status) {
 
System.exit(0);
 
} else {
 
System.exit(1);
 
}
 
}
 
}

Building the Application

构建应用程序

应用程序可再次被构建以生成prohadoop-0.0.1-SNAPSHOT.jar文件。应用程序的各种组件如表3-2所示。

3- 2 使用新API的 WordCount 组件

组件                                     名称

Client 类                             WordCountNewAPI.java

Mapper 类                          WordCountNewAPI.MyMapper.java

Reducer 类                         WordCountNewAPI.MyReducer.java

应用程序 JAR                     prohadoop-0.0.1-SNAPSHOT.jar

Client-side库                      没有对应的程序文件

Remote libraries                没有对应的程序文件

在集群模式下运行WordCount

现在我们将执行MapReduce程序,正如我们在前面章节中执行基于旧API的MapReduce程序那样。在集群模式下运行MapReduce程序的命令如下所示:

hadoop jar prohadoop-0.0.1-SNAPSHOT.jar \ org.aspress.prohadoop.c3.WordCountNewAPI  <INPUT_PATH> <OUTPUT_PATH>
hadoopjarprohadoop-0.0.1-SNAPSHOT.jar \ org.aspress.prohadoop.c3.WordCountNewAPI  <INPUT_PATH> <OUTPUT_PATH>

因为HADOOP_CLASSPATH没有必要,在这里不包括它。如前所述,JAR文件被发送到所有的数据节点,且这些节点上的本地类路径被配置为包括这个JAR。输入和输出路径是指HDFS中的路径。hadoop命令负责执行这些预备步骤,最后在集群上以分布式方式执行这些任务(t.dbdao.com)。

Hadoop任务中的第三方库

到目前为止,在Mapper 和 Reducer类中只有标准Java和Hadoop库被使用。它们包括在Hadoop分布和标准Java类中发现的库(如String.class)。

但是,仅仅使用标准库中的类你不能开发复杂的Hadoop任务;有时会使用第三方库。如前所述,为执行Mapper 和 Reducer实例,Mapper 和 Reducer类所使用的这些库需要被发送和配置在节点上。

首先,你需要编写有点不同的程序来使用第三方库。表3-4展示了使用第三方库的WordCount程序的一个版本。它是一个人为例子,但可用来说明这一概念。

假设你有包含一个单词列表的文件,有些词包括Unicode字符和数字(0-9)的组合。剩下其他的仅仅是Unicode字符。数字和字母组合的单词往往在合同文档中使用,其中各种标识符是字母和单词的组合。为方便起见,有其它字符如“ – ”和“$”的单词将被排除(t.dbdao.com)。

假设你只对带字母和数字组合的单词感兴趣。要确定一个词是否是这样的组合,使用commons-lang库中的StringUtils.class。能使你做出适当决定的方法调用为:

if(StringUtils.isAlphanumeric(key.toString()) && !StringUtils.isAlphaSpace(key.toString()) )
if(StringUtils.isAlphanumeric(key.toString()) && !StringUtils.isAlphaSpace(key.toString()) )

完整代码如表3-4所示。

3- 4 WordCountUsingToolRunner.java

package org.aspress.prohadoop.c3;



import java.io.IOException; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.util.GenericOptionsParser; 
import org.apache.commons.lang.StringUtils;



public class WordCountUsingToolRunner extends Configured implements Tool{



public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 
private final static IntWritable one = new IntWritable(1);


public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String w = value.toString(); context.write(new Text(w), one);

} }



public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
public void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

if(StringUtils.isAlphanumeric(key.toString()) && !StringUtils.isAlphaSpace(key.toString()) )

{

int sum = 0;

for (IntWritable val : values) { 
sum += val.get();

}

context.write(key, new IntWritable(sum));

}

}

}


public  int run(String[] allArgs) throws Exception { 
Job job = Job.getInstance(getConf()); 
job.setJarByClass(WordCountUsingToolRunner.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class); 
job.setReducerClass(MyReducer.class);

job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class);

String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); 
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.submit(); return 0;

}



public static void main(String[] args) throws Exception { 
Configuration conf = new Configuration(); 
ToolRunner.run(new WordCountUsingToolRunner (), args);

}



}
package org.aspress.prohadoop.c3;
 
 
 
importjava.io.IOException; 
importorg.apache.hadoop.conf.*; 
importorg.apache.hadoop.fs.Path; 
importorg.apache.hadoop.io.*;
 
importorg.apache.hadoop.mapreduce.Job; 
importorg.apache.hadoop.mapreduce.Mapper; 
importorg.apache.hadoop.mapreduce.Reducer;
 
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
importorg.apache.hadoop.util.Tool;
 
importorg.apache.hadoop.util.ToolRunner;
 
importorg.apache.hadoop.util.GenericOptionsParser; 
importorg.apache.commons.lang.StringUtils;
 
 
 
public class WordCountUsingToolRunner extends Configured implements Tool{
 
 
 
public static class MyMapperextends Mapper<LongWritable, Text, Text, IntWritable> { 
private final static IntWritableone = new IntWritable(1);
 
 
public void map(LongWritablekey, Textvalue, Contextcontext) throwsIOException, InterruptedException {
 
String w = value.toString(); context.write(new Text(w), one);
 
} }
 
 
 
public static class MyReducerextends Reducer<Text, IntWritable, Text, IntWritable> { 
public void reduce(Textkey, Iterable<IntWritable> values, Contextcontext)
 
throwsIOException, InterruptedException {
 
if(StringUtils.isAlphanumeric(key.toString()) && !StringUtils.isAlphaSpace(key.toString()) )
 
{
 
int sum = 0;
 
for (IntWritableval : values) { 
sum += val.get();
 
}
 
context.write(key, new IntWritable(sum));
 
}
 
}
 
}
 
 
public  int run(String[] allArgs) throws Exception { 
Jobjob = Job.getInstance(getConf()); 
job.setJarByClass(WordCountUsingToolRunner.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class); 
job.setReducerClass(MyReducer.class);
 
job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class);
 
String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); 
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
job.submit(); return 0;
 
}
 
 
 
public static void main(String[] args) throws Exception { 
Configurationconf = new Configuration(); 
ToolRunner.run(new WordCountUsingToolRunner (), args);
 
}
 
 
 
}

表3-4将在稍后讨论。首先,你应该了解一些使用第三方库所需的POM文件变更。与POM文件相关的部分以粗体展示在表3-5中(t.dbdao.com)。

3- 5 有关POM文件的变更

<dependencies>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.2.0</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>commons-lang</groupId>

<artifactId>commons-lang</artifactId>

<version>2.3</version>

</dependency>

 

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-assembly-plugin</artifactId>

<version>2.4</version>

<configuration>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

</configuration>

<executions>

<execution>

<id>assemble-all</id>



 

<phase>package</phase>

<goals>

<goal>single</goal>

</goals>

</execution>

</executions>

</plugin>

</plugins>

</build>
<dependencies>
 
<dependency>
 
<groupId>org.apache.hadoop</groupId>
 
<artifactId>hadoop-client</artifactId>
 
<version>2.2.0</version>
 
<scope>provided</scope>
 
</dependency>
 
<dependency>
 
<groupId>commons-lang</groupId>
 
<artifactId>commons-lang</artifactId>
 
<version>2.3</version>
 
</dependency>
 
 
 
</dependencies>
 
<build>
 
<plugins>
 
<plugin>
 
<groupId>org.apache.maven.plugins</groupId>
 
<artifactId>maven-assembly-plugin</artifactId>
 
<version>2.4</version>
 
<configuration>
 
<descriptorRefs>
 
<descriptorRef>jar-with-dependencies</descriptorRef>
 
</descriptorRefs>
 
</configuration>
 
<executions>
 
<execution>
 
<id>assemble-all</id>
 
 
 
 
 
<phase>package</phase>
 
<goals>
 
<goal>single</goal>
 
</goals>
 
</execution>
 
</executions>
 
</plugin>
 
</plugins>
 
</build>

在这个POM文件中,添加了2.3版本的commons-lang库。注意提供给hadoop-client库的<scope>标记。目前,所提供的值与<build>标记相互作用,以确保两个独立的应用程序JAR文件能在构建过程中被创建:

  • prohadoop-0.0.1-SNAPSHOT.jar只包括org.apress包及其子包中的自定义类。
  • prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar不仅包括自定义代码,还包括commons-lang-2.3.jar文件中的类文件。Hadoop客户端库中的类文件并不包括在该jar文件中,因为我们已经提供了scope值,正如在POM文件中所提供的。该设置包括编译时的库,但在生成的JAR文件中不会捆绑它。

为什么Hadoop客户端及其依赖库不包含在prohadoop-0.0.1-SNAPSHOT- jar-with-dependencies.jar文件中?记住,Maven不仅解决了该库,还有已声明库(版本号正确)依赖的其他任何库。包含hadoop-client库及其依赖库会导致一个臃肿的应用程序JAR文件。Hadoop客户端库已经配置在所有节点上,因此hadoop-client库的类不需要被包含在应用程序JAR中。这是一个重要的改进。由于排斥Hadoop客户端库,具有相关性的JAR文件的容量都相当小(例如,200 KB vs. 22 MB)。记住,此文件需要从客户端节点传输到远程DataNode。文件的大小会影响任务的启动时间,因为只有当文件被全部转移后,任务才能启动。可通过删除<scope>标记和重新编译尝试该操作(t.dbdao.com)。

我们介绍表3-4之前,先来讨论程序的执行。注意在程序执行中包含有一组额外的库。该库是commons-lang-2.3.jar,它在远程Mapper 和Reducer类中需要。现在WordCountNewAPIV2.class扩展并实现新的类,这些对于分配执行程序所需的库JAR文件都是必需的。

通过调用以下组命令执行程序:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:<DEPENDENT_JARS_USED_BY_ CLIENT_CLASS>

export LIBJARS=$MYLIB/commons-lang-2.3.jar,<OTHER_JARS_USED_BY_REMOTE_COMPONENTS>

hadoop jar prohadoop-0.0.1-SNAPSHOT.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner-libjars $LIBJARS  <INPUT_PATH> <OUTPUT_PATH>
exportHADOOP_CLASSPATH=$HADOOP_CLASSPATH:<DEPENDENT_JARS_USED_BY_CLIENT_CLASS>
 
export LIBJARS=$MYLIB/commons-lang-2.3.jar,<OTHER_JARS_USED_BY_REMOTE_COMPONENTS>
 
hadoopjarprohadoop-0.0.1-SNAPSHOT.jarorg.aspress.prohadoop.c3. WordCountUsingToolRunner-libjars $LIBJARS  <INPUT_PATH> <OUTPUT_PATH>

上面的命令、其参数和环境变量设置解释说明如下:

  • HADOOP_CLASSPATH更新确保了位于$HADOOP_ HOME/bin文件夹中的hadoop命令能够访问客户端程序启动MapReduce任务所使用的从属JAR文件。这种情况下不需要该变量,因为我们MapReduce程序的客户端部分没有利用任何第三方库。
  • $ LIBJARS变量是一个在DataNode上执行的Mapper和Reducer类所需的所有库文件的逗号分隔路径列表。当与HADOOP_CLASSPATH变量中指定的库进行比较时,注意分隔符的区别。
  • 应用程序JAR文件和包含在$ LIBJARS变量中JAR文件然后被发送到所有Map/Reduce任务将被执行的节点上。这是移动代码到数据的一个例子(参照第1章)。这些JAR文件用于配置远程节点上的CLASSPATH变量。main()函数中所使用的ToolRunner类负责这一点。
  • < INPUT_PATH >和< OUTPUT_PATH >是指相对于HDFS的路径(t.dbdao.com)。
  • 执行该程序确保了Reducer在运行中在jar文件中没发现类库不会抛出ClassNotFoundException。

还有另外一种方法无需使用–libjars选项就能运行该任务;可使用如下命令:

hadoop jar prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ org.aspress.prohadoop.c3. WordCountUsingToolRunner <INPUT_PATH> <OUTPUT_PATH>
hadoopjarprohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ org.aspress.prohadoop.c3. WordCountUsingToolRunner <INPUT_PATH> <OUTPUT_PATH>

注意关键区别。依赖库现在包含在应用程序JAR文件里,所以我们不需要提供libjars项。任务每次执行,应用程序JAR文件被发送和配置在远程节点上。事实上,用新旧API都可以使用该方法,无需强制使用。表3-4中所述的ToolRunner类也可以使用它。唯一的缺点是,当库的数量非常大时,在现实应用程序中通常会出现这样的情况,依赖库捆绑在内的JAR文件可能过大,从而增加了构建/测试周期的编译时间。如果你的测试环境远离开发环境(例如,如果你有一个测试集群配置在云上),当你不停地改动你的应用程序时,具有依赖关系的JAR文件越来越大,你可能需要更长的时间去部署。但请记住,这是执行应用程序最通用的方法。

本书其余部分,新的MapReduce API和ToolRunner类将被用于执行任务。

最后,是时候讨论表3-4了。它与表3-2和表3-3有什么不同?区别主要在于程序在命令行上被执行的方式和确保在远程节点的Map和Reduce进程中可访问新库:

  • 表3-4必须使用GenericOptionsParser.class提取程序所需的参数,如输入路径和输出路径。下面几行代码可执行提取(t.dbdao.com):

new GenericOptionsParser(getConf(),allArgs).getRemainingArgs()

  • 这些行删除-libjars和$ LIBJARS_PATH参数,仅返回应用程式,也就是正被执行的Hadoop任务,所需要的参数。
  • 表3-4中最重要的方面在于,main()方法中通过ToolRunner.run()调用传递的Configuration实例必须是在run()方法中配置任务时使用的同一Configuration实例。为了确保这一点,run()方法始终使用在可配置界面中定义,并在该类扩展的配置类中实现的getconf()方法。如果没有使用相同的Configuration实例,任务将不会被正确配置,第三方JAR文件将不能用于远程Mapper和Reducer任务。
  • ToolRunner.run( )方法负责解析-libjars参数。它将这个任务委托给GenericOptionsParser.class。-libjars参数的值被添加Configuration对象上,这是远程任务的配置方式。
  • 表3-2和3-3没有传递第三方库的能力。如果你想用表3-2和3-3来使用第三方库,你需要将它们打包到应用程序JAR文件。该文件为prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar,是由Maven构建生成的。(构建此JAR的过程和目的在本章前面已经讨论过)

表3-4中所描述的MapReduce程序的各种组件如表3-3中所示。

3- 3 使用新API和 ToolRunner WordCount 组件

组件                                                  名称

Client class                                      WordCountUsingToolRunner.java

Mapper class                                   WordCountUsingToolRunner.MyMapper.java

Reducer class                                  WordCountUsingToolRunner.MyReducer.java

Application JAR                              prohadoop-0.0.1-SNAPSHOT.jar

Alternative application JAR           prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Client-side libraries                        无

Remote libraries                             commons-lang-2.3.jar (只有当

将prohadoop-0.0.1-SNAPSHOT.jar作为应用程序JAR使用时)

Summary  小结

本章中,你开始对Hadoop平台有所了解。学习了如何使用虚拟机来安装Hadoop开发环境,首次编写了Hadoop程序。当然这些程序很简单,但通过这些简单的程序你了解了Hadoop平台上的一些复杂概念。

你了解了Hadoop中的各种配置项以及Hadoop任务的配置方式,熟悉了Hadoop中启用任务配置以及高度可定制化I / O格式的类数组。最后,通过一个例子展示了一组库文件从客户端被传输到Hadoop集群各个远程节点的方式。

dbDao.com 引导式IT在线教育

dbDao 百度贴吧:http://tieba.baidu.com/dbdao

扫码关注dbDao 微信公众号:

欢迎大家继续关注慧邮件邮件营销平台,也可以在我们的慧邮件官网了解更多邮件营销技巧,大数据知识,也可以通过电话:400-666-5494联系到我们,更多精彩知识、活动等着你。

  • *真实姓名:
  • *手机号码:
  • 公司名称:
  • 咨询内容:

CopyRight © 2009 - 2020 All Right Reserved 备案号:闽ICP备15004550号-275

厦门书生企友通科技有限公司 QYT.com 版权所有