Hadoop架构预览
Apache Hadoop是一个开源软件框架,用于在廉价硬件上大规模存储和计算数据集。以下是5个组成Hadoop的模块。
cluster是一个集合的主机(被称为nodes)。Nodes可以再被分成racks。这些是基础设施的硬件部分。
YARN Infrastructure (Yet Another Resource Negotiator) 是一个框架,用于提供计算资源(CPU、内存等)给application的执行。两个重要的元素:- Resource Manager是master,每个集群只有一个active状态的。它知道slaves所在的位置(Rack Awareness机架感应)和slaves拥有的资源。其中最重要的是Resource Scheduler ,决定如何分配这些资源。
- Node Manager是slave,每个集群里好多个。它启动时会告诉Resource Manager,而且定期的发送心跳包到Resource Manager。每个Node Manager提供一些资源给集群。这些资源容量(resource capacity)就是内存的大小和vcore的数量。在运行的时候,由Resource Scheduler决定如何分配这些资源:一个Container是Node Manager容量的a fraction(一部分或者说是将资源抽象成的最基本单元),用于client执行程序。
HDFS Federation提供永久、可靠和分布式的存储。
可选的存储方案如Amazon使用的Simple Storage Service (S3)。 MapReduce Framework是属于软件层,用于实现MapReduce paradigm。YARN Infrastructure和HDFS Federation是完全的解耦和独立的:前者提供资源运行,后者提供存储。MapReduce Framework是众多可运行在YARN上的框架之一(虽然暂时还是只有这一个框架)。
YARN: application启动流程
在YARN至少有3个角色:
- Job Submitter(客户端client)
- Resource Manager (master)
- Node Manager (slave)
一个application(在这里会有混淆,此application是指的是Application Master)的流程是这样:
1.客户端client提交一个application到Resource Manager 2.Resource Manager分配container 3.Resource Manager联系相关的Node Manager 4.Node Manager启动container 5.Container执行Application Master
Application Master负责单个application(在这里会有混淆,此application是指下面的MR的job)的执行过程:AM向 Resource Scheduler请求containers和指定程序(java的main类)在获得的containers执行。Application Master必须知道application的逻辑才能进行请求和执行application,所以Application Master是特定的框架(framework-specific)。MapReduce框架有实现自己的Application Master。
解剖MapReduce Job
在MapReduce里,一个YARN application被称为job。由MapReduce框架实现的Application Master被称为MRAppMaster。
MapReduce时间轴
一个MapReduce Job的执行时间轴:
Map阶段:多个Map Tasks被执行Reduce阶段:多个Reduce Tasks被执行注意:Reduce阶段可能在Map阶段结束之前就启动了,所以它们是交叉运行的。
Map阶段
接下来我们集中讨论Map阶段。一个关键的讨论是:Application Master运行当前的job需要多少MapTasks。
用户提供了什么信息?
我们往回走一点,在一个客户端client提交一个application时,有以下信息提供了给YARN Infrastructure,分别是:
- 配置:可能只有一部分参数是用户指定,或使用默认值。注意:默认参数也是受Hadoop的提供商不同而不同,如Amazon。
-
一个jar包,包含:
- 一个map()的实现
- 一个conbiner的实现
- 一个reduce()的实现
-
input和output的信息:
- input目录:是否是HDFS的目录?还是S3?有多少个文件?
- output目录:是否存储output?存在HDFS?存在S3?
input目录文件的数据量决定了一个job的MapTasks的数量。
多少个MapTasks?
Application Master会为每一个map分片(map split)启动一个MapTask。一般来说,每一个input文件就是一个map分片。如果input文件太大(大于HDFS块大小),就会将它分为两个或以上的map分片。以下是FileInputFormat类的getSplits()方法的伪代码:
num_splits = 0for each input file f: remaining = f.length while remaining / split_size > split_slope: num_splits += 1 remaining -= split_size
其中:
split_slope = 1.1split_size =~ dfs.blocksize
注意参数mapreduce.job.maps在MRv2里已经被废弃。
启动MapTask
MapReduce Application Master 向Resource Manager请求Container来运行job:每个MapTask(Map分片)一个Container。
MapReduce Application Master 向Resource Manager的Container请求会尽量考虑本地处理map分片。请求如下:- 请求一个container和map分片是在同一个Node Manager(map分片可能存储在多个结点,具体由HDFS的replication factor来决定)。
- 否则,请求一个container,而这个container是和map分片同一个rack里。
- 否则,请求集群里的其他Node Manager的一个container。
以上仅仅是一个提示给Resource Scheduler。如果与Resource Scheduler的目标有冲突,Resource Scheduler完全有自由的无视本地数据的请求的要求。
一单分配了container,MapTask就会被启动。Map阶段:一个执行场景的例子
在Map阶段,以下是可能的执行场景:
- 有两个Node Manager:每个Node Manager有2G内存(NM容量),每个MapTask需要1G,则我们可以在每个Node Manager运行两个container(这是理想场景,实际上Resource Scheduler执行上可能会有点不同)。
- 现在没有其他YARN applications在这个集群上运行着。
- 我们job有8个map分片(举个例子:在input目录有7个文件,其中一个文件大于HDFS的块大小,所以我们把它分为两个map分片):我们需要8个MapTask。
MapTask执行的时间轴
现在我们专注于一个MapTask,这是一个MapTask的时间轴:
- INIT阶段:启动MapTask
- EXECUTION阶段:以map方法遍历map分片中的每个元组(tuple,(key,value)形式)。
- SPILLING阶段:每次map方法的输出都会保存在内存缓存中;当内存快满的时候,我们(并行)开始spilling阶段将缓存中的数据挪走。
- SHUFFLE阶段:在spilling阶段结束后,我们会合并所有的map输出并打包给reduce阶段。
MapTask: INIT
在INIT阶段,我们:
1.创建一个 context(TaskAttemptContext.class) 2.创建一个 Mapper.class的实例。3.设置input(InputFormat.class, InputSplit.class, RecordReader.class) 4.设置output(NewOutputCollector.class) 5.创建一个mapper context(MapContext.class, Mapper.Context.class) 6.初始化input 7.创建一个SplitLineReader.class的对象8.创建一个HdfsDataInputStream.class的对象MapTask: EXECUTION
EXECUTION阶段执行Mapper类的run方法。用户可以重写run方法,默认它会被setup方法调用:run方法默认是不做任何的事情,但用户可以重写它,如设置一些初始化参数。setup方法后,map分配的每个<key,value>元组都会调用map方法。因此,map()收到的是:一个key、一个value和一个mapper context。使用这个context,map可以保存它的输出到缓存中。
注意map分配是一块一块的被提取(is fetched chuck by chunk,如64KB),然后每一块都会被(SplitLineReader.class)分成多个元组(key, value) 。此过程在Mapper.Context.nextKeyValue 方法里完成。 当map分片被完成处理后,run方法会调用clean方法:默认的,clean方法不作任何事情,除非用户重写它。MapTask: SPILLING
在EXECUTION阶段的map输出会通过Mapper.Context.write()方法写到一个环形缓存里(MapTask.MapOutputBuffer)。缓存的大小是固定的,是根据参数mapreduce.task.io.sort.mb (默认是 100MB)决定的。
当环形缓存快慢的时候(默认是mapreduce.map. sort.spill.percent: 80% ),SPILLING阶段会被启动(另外个线程平行启动)。如果splilling线程太慢而缓存已经到达100%,则map()会暂停执行,直到缓存足够。
SPILLING线程执行以下动作: 1.创建一个SpillRecord和FSOutputStream(本地文件系统)。 2.在内存中将已使用的内存块进行排序:输出的元组会被排序(根据 (partitionIdx, key),先partitionIdx,后key),使用的排序方法是快速排序。 3.被排序后的输出会被分割为partition:每个partition对应一个ReduceTask。4.所有partitions都会被顺序写入本地文件。多少个ReduceTasks?
一个Job的ReduceTask的数量是由参数mapreduce.job.reduces决定的。
关联输出元组(output tuple)的paritionIdx是什么?
一个输出元组的paritionIdx是一个partition的索引(index)。这过程在 Mapper.Context.write()方法里:
partitionIdx = (key.hashCode() & Integer.MAX_VALUE) % numReducers
partitionIdx作为元属于与输出元组保存在环形缓存中里。用户可以自定义分割器(partitioner),通过指定参数mapreduce.job.partitioner.class。
什么时候使用上combiner?
如果用户指定combiner,则在SPILLING thread里,在SPILLING阶段的第4步写入本地文件之前,会执行combiner处理每个partition里的元组。基本上,我们:
- 创建Reducer.class实例(被指定为combiner的Reducer.class)。
- 创建一个Reducer.Context:其输出会被排序后放在本地文件系统中。
- 执行Reduce.run():看 ReduceTask的描述。
combiner一般是标准的reduce()函数实现,所以它可以被看作为本地的reducer。
MapTask: end of EXECUTION
在EXECUTION阶段的最后,最后一次触发SPILLING线程时,有以下细节:
- 排序和spill还未spill的元组
- 开始阶段
注意每次缓存快要满的时候,我们都会得到个spill文件(SpillReciord + output文件)。每个spill文件包含多个partitions(segments)。
Shuffle与Reduce阶段
Copy阶段
1.由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据,因此map和reduce是交叉进行的,其实就是shuffle。Reduce任务通过HTTP向各个Map任务拖取(下载)它所需要的数据(网络传输)。
Reducer是如何知道要去哪些机器取数据呢?一旦map任务完成之后,就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问,直到提取完所有数据(如何知道提取完?)数据被reduce提走之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。
2.reduce进程启动数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,那到底同时到多少个Mapper下载数据??这个并行度是可以通过mapreduce.reduce.shuffle.parallelcopies(default5)调整。默认情况下,每个Reducer只会有5个map端并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,那么reduce也最多只能同时下载5个map的数据,所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。 在Reducer内存和网络都比较好的情况下,可以调大该参数;3.reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。reduce下载线程的这个最大的下载时间段是可以通过mapreduce.reduce.shuffle.read.timeout(default180000秒)调整的。如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。一般情况下都会调大这个参数,这是企业级最佳实战。MergeSort阶段
copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源码里面写死了) 来设置,这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。JVM的heapsize的70%。
根据copy过来数据的大小,可能将其复制到内存或磁盘。mapreduce.reduce.shuffle.input.buffer.percent属性配置了这个任务占用的缓存空间在堆栈空间中的占用比例(默认0.70)。mapreduce.reduce.shuffle.merge.percent决定缓存溢出到磁盘的阈值(默认0.66),mapreduce.reduce.merge.inmem.threshold设置了Map任务在缓存溢出前能够保留在内存中的输出个数的阈值(默认1000),只要一个满足,输出数据都将会写到磁盘。在收到Map任务输出数据后,Reduce任务进入合并(merge)或排序(sort)阶段。同时合并的文件流的数量由mapreduce.task.io.sort.factor属性决定(默认10)。Map任务输出数据的所有压缩操作,在合并时都会在内存中进行解压缩操作。这种merge方式一直在运行,直到没有map端的数据时才结束,然后生成最终的那个文件。reduce()阶段
1.当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段。
在这个阶段,需要读取数据,默认mapreduce.reduce.input.buffer.percent(default 0.0)(源代码MergeManagerImpl.java:674行)来设置reduce的缓存。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。所以默认情况下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让reduce直接从缓存读数据,这样做就有点Spark Cache的感觉。2.Reduce在这个阶段,框架为已分组的输入数据中的每个 <key, (list of values)>对调用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable,Writable)写入文件系统的。Reducer的输出是没有排序的。
Reduce相关参数调优
选项 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.reduce.parallel.copies | int | 5 | 每个reduce并行下载map结果的最大线程数 |
mapred.reduce.copy.backoff | int | 300 | reduce下载线程最大等待时间(in sec) |
io.sort.factor | int | 10 | 同上 |
mapred.job.shuffle.input.buffer.percent | float | 0.7 | 用来缓存shuffle数据的reduce task heap百分比 |
mapred.job.shuffle.merge.percent | float | 0.66 | 缓存的内存中多少百分比后开始做merge操作 |
mapred.job.reduce.input.buffer.percent | float | 0.0 | sort完成后reduce计算阶段用来缓存数据的百分比 |
YARN和MapReduce的交互图
从这图就可以看清楚概念:
Container是运行程序的基本容器。启动一个MapReduce作业(Job),都会先启动一个Application Master,MapReduce有其实现的AM,叫MRAppMaster。
启动AM后,一个MapReduce作业(Job)会被拆成多个Task。Task会变为Task Attempt,最后在Container里被执行时,就是具体的MapTask和ReduceTask。
参考: