04月03, 2019

《MapReduce:大型集群上的简化数据处理》 论文详解(下)

接上篇《MapReduce:大型集群上的简化数据处理》 论文详解(下)

4.6、跳过错误记录

       有的时候因为用户的代码里有bug,导致在某一个记录上map或reduce函数突然crash掉。这样的bug使得MapReduce操作不能完成。虽然一般处理方法是修复这个bug,但是有时候这是不现实的,也许这个bug是在源代码不可得到的第三方库里。有的时候也可以忽略一些记录,例如,当在一个大的数据集上进行统计分析,我们提供一个可选的执行模式,在这个模式下,MapReduce库检测那些记录引起的crash,然后跳过那些记录继续执行程序。

       每个worker进程安装了一个信号处理器来捕获内存段异常和总线错误信息。在调用用户自定义的map或reduce操作之前,MapReduce库把记录的序列号存储在一个全局变量里。如果用户代码生成一个信号,信号处理器就会用“最后一口气(last gasp)”发送一个包含序号的UDP包给MapReduce的master。当master看到一个特定记录失败不止一次的时候,Master就会标记这条记录,当相关的map或reduce任务再次执行的时候,这个记录就会被跳过。

4.7、本地执行

       调试Map和Reduce函数的bug是非常困难的,因为实际的计算程序是部署在分布式的系统中,而且通常是在好几千台计算机上执行,具体的执行位置是由master进行动态调度的,这又大大增加了调试的难度。为了简化调试、profile和小规模测试,我们开发了一套MapReduce库的本地实现版本,通过使用本地版本的MapReduce库,MapReduce操作能够在本地计算机上顺序的执行。用户可以控制MapReduce操作的执行,把操作限制到特定的Map任务上。用户通过设定特别的标志来在本地执行他们的程序,之后就可以很容易的使用本地调试和测试工具(比如gdb)。

4.8、状态信息

       master使用嵌入式的HTTP服务器(如Jetty)显示一组状态信息页面,用户可以监控各种执行状态。状态信息页面显示了包括计算执行的进度,比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。页面还包含了指向每个任务的stderr和stdout文件的链接。用户根据这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源。这些页面也可以用来分析什么时候计算执行的比预期的要慢。

       此外,处于最顶层的状态页面显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务。这些信息对于调试用户代码中的bug很有帮助。

4.9计数器

       MapReduce库提供一个计数器工具,来计算各种事件的发生次数。比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇German文档等等。

       为了使用这个特性,用户在程序中创建一个命名的计数器对象,在Map和Reduce函数中相应的增加计数器的值。例如:

Counter * uppercase;
uppercase=GetCounter("uppercase");
map(String name,String contents):
 for each word w in contents:
    if(IsCapitalized(w)):
      uppercase->Increment();
    EmitIntermediate(w"1");

      &nbsp这些计数器的值周期性的从各个单独的worker机器上传递给master(附在ping的应答包中传递)。master把执行成功的Map和Reduce任务的计数器值进行累计,当MapReduce操作完成之后,返回给用户代码。计数器当前的值也会显示在master的状态页面上,这样用户就可以看到当前计算的进度。当计数器的值累加的时候,master要检查重复运行的Map或者Reduce任务,避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相同的任务被多次执行)。

       有些计数器的值是由MapReduce库自动维护的,比如已经处理的输入的key/value 键值对的数量、输出的key/value 键值对的数量等等。

       计数器机制对于MapReduce操作的完整性检查非常有用。比如,在某些MapReduce操作中,用户需要确保输出的key value 键值对精确的等于输入的key value 键值对,或者处理的German文档数量在处理的整个文档数量中属于合理范围。

5、性能

       在本节,我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能。一个计算用来在一个大概1TB的数据中查找特定的匹配串。另一个计算对大概1TB的数据进行排序。

       如此大量使用MapReduce的程序在实际应用中是非常典型的 :一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式;另一类是从海量数据中抽取少部分的用户感兴趣的数据。

5.1、集群配置

       所有这些程序都运行在一个大约由1800台机器构成的集群上。机器的配置是:2个2G的Intel Xeon超线程处理器(CPU),4G内存,两个160G IDE磁盘,一个千兆以太网卡。这些机器部署在一个两层的树形交换网络中,在root节点大概有100-200GBPS的传输带宽。所有这些机器都采用相同的部署(对等部署),因此任意两点之间的网络来回时间小于1毫秒。

       在4GB内存里,大概有1~1.5G用于运行在集群上的其他任务。测试程序在周末下午开始执行,这时主机的CPU、磁盘和网络基本上处于空闲状态。

5.2、Grep

       这个分布式的grep程序需要扫描大概10的10次方个由100个字节组成的记录,查找出现概率较小的3个字符的匹配串(pattern)(这个pattern在92337个记录中出现)。输入数据被拆分成大约64M的Block(M=15000),整个输出数据存放在一个文件中(R=1)。

alt

                                                         图2

       图2显示了这个计算随时间的处理过程。其中Y轴表示输入数据的处理速度。处理速度随着参与MapReduce计算的机器数量的增加而增加,当1764台worker参与计算时,处理速度达到了30GB/s。当Map任务结束的时候,即在计算开始后80秒,输入的处理速度降到0。整个计算过程从开始到结束一共花了大概150秒。这包括了大约一分钟的初始启动阶段。初始启动阶段消耗的时间包括了把这个程序传送到各个worker机器上的时间、等待GFS文件系统打开1000个输入文件集合的时间、获取相关的文件本地位置优化信息的时间

5.3、排序

       排序程序处理10的10次方个100个字节组成的记录(大概1TB的数据)。这个程序模仿TeraSort benchmark。

       排序程序由不到50行代码组成。只有三行的Map函数从文本行中解析出10个字节的key值作为排序的key,并且把这个key和原始文本行作为中间的key/value pair值输出。我们使用了一个内置的恒等函数作为Reduce操作函数。这个函数把中间的key/value pair值不作任何改变输出。最终排序结果输出到两路复制的GFS文件系统(也就是说,程序输出2TB的数据)。

       如前所述,输入数据被分成64MB的Block(M=15000)。我们把排序后的输出结果分区后存储到4000个文件(R=4000)。分区函数使用key的原始字节来把数据分区到R个片段中。

       在这个benchmark测试中,我们使用的分区函数知道key的分区情况。通常对于排序程序来说,我们会增加一个预处理的MapReduce操作用于采样key值的分布情况,通过采样的数据来计算对最终排序处理的分区点。

alt

                                                         图3

       图三(a)显示了这个排序程序的正常执行过程。左上的图显示了输入数据读取的速度。数据读取速度峰值会达到13GB/s,并且所有Map任务完成之后,即大约200秒之后迅速滑落到0。值得注意的是,排序程序输入数据读取速度小于分布式grep程序。这是因为排序程序的Map任务花了大约一半的处理时间和I/O带宽把中间输出结果写到本地硬盘。相应的分布式grep程序的中间结果输出几乎可以忽略不计。

       左边中间的图显示了中间数据从Map任务发送到Reduce任务的网络速度。这个过程从第一个Map任务完成之后就开始缓慢启动了。图示的第一个高峰是启动了第一批大概1700个Reduce任务(整个MapReduce分布到大概1700台机器上,每台机器1次最多执行1个Reduce任务)。排序程序运行大约300秒后,第一批启动的Reduce任务有些完成了,我们开始执行剩下的Reduce任务。所有的处理在大约600秒后结束。

       左下图表示Reduce任务把排序后的数据写到最终的输出文件的速度。在第一个排序阶段结束和数据开始写入磁盘之间有一个小的延时,这是因为worker机器正在忙于排序中间数据。磁盘写入速度在2-4GB/s持续一段时间。输出数据写入磁盘大约持续850秒。计入初始启动部分的时间,整个运算消耗了891秒。这个速度和TeraSort benchmark[18]的最高纪录1057秒相差不多。

       还有一些值得注意的现象:输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少,这是因为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份(我们使用了2路的GFS文件系统,写入复制节点的原因是为了保证数据可靠性和可用性)。我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。如果底层文件系统使用类似容错编码14的方式而不是复制的方式保证数据的可靠性和可用性,那么在输出数据写入磁盘的时候,就可以降低网络带宽的使用。

5.4、备用任务的影响

       图三(b)显示了关闭了备用任务后排序程序执行情况。执行的过程和图3(a)很相似,除了输出数据写磁盘的动作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。在960秒后,只有5个Reduce任务没有完成。这些拖后腿的任务又执行了300秒才完成。整个计算消耗了1283秒,多了44%的执行时间。

5.5、机器失效

       在图三(c)中演示的排序程序执行的过程中,我们在程序开始后几分钟有意的kill了1746个worker中的200个。集群底层的调度立刻在这些机器上重新开始新的worker处理进程(因为只是worker机器上的处理进程被kill了,机器本身还在工作)。

       图三(c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的Map任务丢失了(由于相应的执行Map任务的worker进程被kill了),需要重新执行这些任务。相关Map任务很快就被重新执行了。整个运算在933秒内完成,包括了初始启动时间(只比正常执行多消耗了5%的时间)。

6、经验

       我们在2003年1月完成了第一个版本的MapReduce库,在2003年8月的版本有了显著的增强,这包括了输入数据本地优化、worker机器之间的动态负载均衡等等。从那以后,我们惊喜的发现,MapReduce库能广泛应用于我们日常工作中遇到的各类问题。它现在在Google内部各个领域得到广泛应用,包括:

  • 大规模机器学习问题
  • Google News和Froogle产品的集群问题
  • 从公众查询产品(比如Google的Zeitgeist)的报告中抽取数据。
  • 从大量的新应用和新产品的网页中提取有用信息(比如,从大量的位置搜索网页中抽取地理位置信息)。
  • 大规模的图形计算。

alt

                                                         图4

alt

                                                         表1

       图4显示了在我们的源代码管理系统中,随着时间推移,独立的MapReduce程序数量的显著增加。从2003年早些时候的0个增长到2004年9月份的差不多900个不同的程序。MapReduce的成功取决于采用MapReduce库能够在不到半个小时时间内写出一个简单的程序,这个简单的程序能够在上千台机器的组成的集群上做大规模并发处理,这极大的加快了开发和原形设计的周期。另外,采用MapReduce库,可以让完全没有分布式和/或并行系统开发经验的程序员很容易的利用大量的资源,开发出分布式和/或并行处理的应用。

       在每个任务结束的时候,MapReduce库统计计算资源的使用状况。在表1,我们列出了2004年8月份MapReduce运行的任务所占用的相关资源。

6.1、大规模索引

       到目前为止,MapReduce最成功的应用就是重写了Google网络搜索服务所使用到的index系统。索引系统的输入数据是网络爬虫抓取回来的海量的文档,这些文档数据都保存在GFS文件系统里。这些文档原始内容(alex注:raw contents,我认为就是网页中的剔除html标记后的内容、pdf和word等有格式文档中提取的文本内容等)的大小超过了20TB。索引程序是通过一系列的MapReduce操作(大约5到10次)来建立索引。使用MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:

  • 实现索引部分的代码简单、小巧、容易理解,因为对于容错、分布式以及并行计算的处理都是MapReduce库提供的。比如,使用MapReduce库,计算的代码行数从原来的3800行C++代码减少到大概700行代码。
  • MapReduce库的性能已经足够好了,因此我们可以把在概念上不相关的计算步骤分开处理,而不是混在一起以期减少数据传递的额外消耗。概念上不相关的计算步骤的隔离也使得我们可以很容易改变索引处理方式。比如,对之前的索引系统的一个小更改可能要耗费好几个月的时间,但是在使用MapReduce的新系统上,这样的更改只需要花几天时间就可以了。
  • 索引系统的操作管理更容易了。因为由机器失效、机器处理速度缓慢、以及网络的瞬间阻塞等引起的绝大部分问题都已经由MapReduce库解决了,不再需要操作人员的介入了。另外,我们可以通过在索引系统集群中增加机器的简单方法提高整体处理性能。

7、相关工作

       很多系统都提供了严格的编程模式,并且通过对编程的严格限制来实现并行计算。例如,一个结合函数可以通过把N个元素的数组的前缀在N个处理器上使用并行前缀算法,在log N的时间内计算完。MapReduce可以看作是我们结合在真实环境下处理海量数据的经验,对这些经典模型进行简化和萃取的成果。更加值得骄傲的是,我们还实现了基于上千台处理器的集群的容错处理。相比而言,大部分并发处理系统都只在小规模的集群上实现,并且把容错处理交给了程序员。

       Bulk Synchronous Programming和一些MPI原语提供了更高级别的并行处理抽象,可以更容易写出并行处理的程序。MapReduce和这些系统的关键不同之处在于,MapReduce利用限制性编程模式实现了用户程序的自动并发处理,并且提供了透明的容错处理。

       我们数据本地优化策略的灵感来源于active disks等技术,在active disks中,计算任务是尽量推送到数据存储的节点处理,这样就减少了网络和IO子系统的吞吐量。我们在挂载几个硬盘的普通机器上执行我们的运算,而不是在磁盘处理器上执行我们的工作,但是达到的目的一样的。

       我们的备用任务机制和Charlotte System提出的eager调度机制比较类似。Eager调度机制的一个缺点是如果一个任务反复失效,那么整个计算就不能完成。我们通过忽略引起故障的记录的方式在某种程度上解决了这个问题。

       MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上分布和运行用户任务。虽然这个不是本论文的重点,但是有必要提一下,这个集群管理系统在理念上和其它系统,如Condor是一样。

       MapReduce库的排序机制和NOW-Sort[1]的操作上很类似。读取输入源的机器(map workers)把待排序的数据进行分区后,发送到R个Reduce worker中的一个进行处理。每个Reduce worker在本地对数据进行排序(尽可能在内存中排序)。当然,NOW-Sort没有给用户自定义的Map和Reduce函数的机会,因此不具备MapReduce库广泛的实用性。

       River提供了一个编程模型:处理进程通过分布式队列传送数据的方式进行互相通讯。和MapReduce类似,River系统尝试在不对等的硬件环境下,或者在系统颠簸的情况下也能提供近似平均的性能。River是通过精心调度硬盘和网络的通讯来平衡任务的完成时间。MapReduce库采用了其它的方法。通过对编程模型进行限制,MapReduce框架把问题分解成为大量的“小”任务。这些任务在可用的worker集群上动态的调度,这样快速的worker就可以执行更多的任务。通过对编程模型进行限制,我们可用在工作接近完成的时候调度备用任务,缩短在硬件配置不均衡的情况下缩小整个操作完成的时间(比如有的机器性能差、或者机器被某些操作阻塞了)。

       BAD-FS采用了和MapReduce完全不同的编程模式,它是面向广域网(alex注:wide-area network)的。不过,这两个系统有两个基础功能很类似。(1)两个系统采用重新执行的方式来防止由于失效导致的数据丢失。(2)两个都使用数据本地化调度策略,减少网络通讯的数据量。

       TACC是一个用于简化构造高可用性网络服务的系统。和MapReduce一样,它也依靠重新执行机制来实现的容错处理。

8、结束语

       MapReduce编程模型在Google内部成功应用于多个领域。我们把这种成功归结为几个方面:首先,由于MapReduce封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难点的细节,这使得MapReduce库易于使用。即便对于完全没有并行或者分布式系统开发经验的程序员而言;其次,大量不同类型的问题都可以通过MapReduce简单的解决。比如,MapReduce用于生成Google的网络搜索服务所需要的数据、用来排序、用来数据挖掘、用于机器学习,以及很多其它的系统;第三,我们实现了一个在数千台计算机组成的大型集群上灵活部署运行的MapReduce。这个实现使得有效利用这些丰富的计算资源变得非常简单,因此也适合用来解决Google遇到的其他很多需要大量计算的问题。

       我们也从MapReduce开发过程中学到了不少东西。首先,约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境;其次,网络带宽是稀有资源。大量的系统优化是针对减少网络传输量为目的的:本地优化策略使大量的数据从本地磁盘读取,中间文件写入本地磁盘、并且只写一份中间文件也节约了网络带宽;第三,多次执行相同的任务可以减少性能缓慢的机器带来的负面影响(即硬件配置的不平衡),同时解决了由于机器失效导致的数据丢失问题。

感谢

       Josh Levenberg校定和扩展了用户级别的MapReduce API,并且结合他的经验和其他人的改进建议,增加了很多新的功能。MapReduce从GFS中读取和写入数据。我们要感谢Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他们都参与了GFS的开发工作。我们还感谢Percy Liang Olcan Sercinoglu 在开发MapReduce的集群管理系统中的工作。Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach为本论文提出了宝贵的意见。OSDI的无名审阅者,以及我们的审核者Eric Brewer,在论文应当如何改进方面给出了有益的意见。最后,我们感谢Google工程部的所有MapReduce的用户,感谢他们提供了有用的反馈、建议以及错误报告等等。·

扫鸭扫鸭,求关注

alt

本文链接:http://blog.keepting.cn/blog//post/mapReduce_desc_2.html

-- EOF --

Comments