04月02, 2019

《MapReduce:大型集群上的简化数据处理》 论文详解(上)

“For the Horde!Lok’tar Ogar!”(为了部落,不胜则亡)

alt

在Python3的学习中遇到了map()、reduce()高阶函数,顺便了解下mapReduce的定义,于是翻到了这篇很不错的论文论文链接,作者是谷歌的高级研究员 Jeffrey Dean,下边的内容是对这篇论文的翻译和个人理解,论文共有8个模块,此篇介绍了1-4模块的介绍。

摘要

        MapReduce是一个编程模型和一个用于处理生成大型数据集的相关实现。用户定义一个map函数去处理一个键值对然后生成一组中间键值对,一个reduce函数去合并所有具有相同中间键的中间值。许多现实的任务都可以通过这个模型来表达。

       遵循这个模型的程序可以自然并行地执行在一个大型商业计算机集群上。运行时由系统关注于以下问题的细节:分割输入数据、将程序在一组计算机上调度执行、处理机器故障、管理必要的机器间内部通信。这使得没有任何并行与分布式系统编程经验的程序员可以轻松利用大型分布式系统的资源。


1、简介

       在过去的五年里,作者本人和许多谷歌的同事实现了数以百计的独立计算程序,用于处理大量的原始数据,例如爬虫文件、网络请求日志等等,用于计算各种各样的衍生数据,如反转索引、网页文档的各种图结构的标识、每个主机上抓取大量页面的总结、一天中最频繁的一组查询等等。大多数这样的计算程序在概念上是简洁明了的。然而,输入数据通常非常大,而且为了在合理时间内执行完毕计算程序必须分布运行在成百上千个计算机上。这些有关于如何并行计算、分布数据、解决故障的事务需要大量复杂的代码去处理,这使得本来朴素的计算任务变得极为冗杂。

       作为对这种复杂性的应对,我们设计了一个新的抽象模型,它允许我们直接去做纯粹的计算,而将繁杂的细节如并行,容错,数据分布,负载均衡等封装在库中。我们的抽象受到了来自Lisp和其它函数式语言中map和reduce原语(primitives present)的启发。我们发现我们大多数的计算程序都涉及到以下特性:对于输入中的每个逻辑记录提供一个map操作,目的是计算得到一组中间键值对;然后对于所有具有相同键的值实现一个reduce操作,目的是适当地联合衍生数据。我们对于函数模型的使用以及用户自定义map和reduce操作允许我们可以轻易并行化大量的计算,并将重新执行作为容错的主要机制。

       这项工作的主要贡献是一个简洁有力的接口,它实现了自动并行化与大规模数据的分布。这个接口与它的实现一起在大规模商业计算机集群上获得了优异表现。

       第二部分描述了基础编程模型并且给出了一些例子。第三部分描述了一个定制在基于集群的计算环境下的MapReduce接口实现。第四部分描述了一些有用的编程模型细节。第五部分对其在一个任务的实现进行了表现评估。第六部分探索了MapReduce的使用,包括谷歌内部使用它重写产品索引系统的经验。第七部分讨论了相关工作与未来展望。

alt

2、编程模型

       计算过程接受一系列输入键值对,产生一系列输出键值对。MapReduce库的用户用两个函数来表示计算:Map和Reduce。

       Map函数:由用户编写,接收一个输入键值对,产生一组中间键值对。MapReduce库组合所有的具有相同中间键 I 的中间值并且将它们传给Reduce函数。

      Reduce函数:也由用户编写,接收一个中间键 I 和对应的一组值。它将其合并成一组可能规模更小的值。通常每个Reduce函数只会输出零个或一个值。中间值通过一个迭代器提供给用户的reduce函数。这允许我们操作过大以至于不能放在内存中的值列表。

2.1. 例子

考虑统计大型文章集合中每个单词出现次数的问题,用户写出的代码将类似于如下伪代码:

map(String key, String value):
    // key:  document name
    // value: document contents
    for each word w in value:
        EmitIntermediate(w, "1");

reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

      map函数生成每个单词加上一个关联的出现计数(例子中为1)。reduce函数对同一个单词所有生成的计数求和。

      另外,用户编写代码填充入一个mapreduce规格对象,以及输入输出文件名和可选的调整参数。随后用户调用MapReduce函数,并将规格对象传递给它。用户的代码和MapReduce库(C++实现)一同链接。附录A附录A包含了这个例子的完整程序。

2.2 类型

      虽然之前的伪代码用字符串输入输出类型编写,实际上由用户提供的map和reduce函数已经将类型联系起来:

    map         (k1, v1) -> list(k2, v2)
    reduce      (k2, list(v2)) -> list(v2)

也即,输入键值与输出键值的域不同。此外,中间键值与输出键值的域相同。

      我们的C++实现在这里传递字符串,而由用户定义的函数中,把它留给用户代码在字符串与合适的类型之间转变。

2.3 更多的例子

这里有一些有趣程序的简单例子,它们可以轻松使用MapReduce计算。

分布式的Grep:map函数返回所有匹配成功的行。reduce函数是一个同一性函数,仅仅将被生成的中间数据拷贝到输出中。

URL访问频率计数:map函数处理网络页面请求日志并且生成<URL,1> 。reduce函数对所有来自相同URL的值求和并且生成<URL,总计数>

反向网页链接图:map函数对于每个source页面中指向target的链接生成<target,source>。reduce函数链接给定target URL的整个source URL列表,生成<target,list(source)>

每个主机的词条向量(Term-Vector):一个词条向量总结一个文件或者一组文件中最重要的词,表示为一列<word,frequency>键值对。map函数对于每个输入文档生成一个<hostname,term−vector>键值对。reduce函数接收单个主机所有文档的词条向量。它将这些词条向量组合在一起,扔掉不频繁的词汇,然后生成最终的<hostname,term−vector>键值对。

反向索引:map函数分析每个文档,生成一系<word,documentID>键值对。reduce函数接收一个单词所有这样的键值对,排序对应的文档ID并生成一个<word,list(documentID)>键值对。所有输出键值对的集合组成一个简单的反向索引。可以轻易改进这个计算去记录单词位置。 【译注:反向索引 InvertedIndexInvertedIndex是一个搜索引擎术语。正向索引是指统计出每个文档中有哪些单词,而反向索引是指统计出每个单词出现在哪些文档中。】

分布式排序:map函数从每个record中提取关键字key,生成一个<key,record>键值对。reduce函数不加改变地输出所有键值对。这个计算依赖于4.1节描述的划分工具和4.2节描述的顺序特性。

3. 实现

      MapReduce可以有很多不同的实现接口,应当由具体环境来决定正确的实现。比如说,一种实现可能适用于一个小型共享内存机,另一种适用于一个大型NUMA多处理器系统,另一种可能适用于大型网络计算机集群。

这部分描述的实现针对于谷歌内部广泛使用的计算环境:由以太网[4]连接的大型商业计算机集群。在我们的环境中:

  1. 计算机通常是多核x86处理器,运行在Linux系统上,每台机器有2-4GB内存。
  2. 使用商业网络硬件:在机器水平上通常为100Mb/s或1Gb/s,但是在完全双向带宽上被认为平均值较慢。
  3. 一个集群包含成百上千的计算机,因此机器故障是普遍的。
  4. 存储由与独立计算机直接相连的廉价IDE硬盘提供。一个内部开发的分布式文件系统[8]被用来在硬盘上管理数据存储。文件系统使用重复存储来在不可靠的硬件上保证可利用性和可靠性。
  5. 用户把作业提交给调度系统。每个作业包含一系列任务,由调度器分配到集群内一组可用的计算机上。

3.1 运行总览

      通过自动划分输入数据为M段(split)来保证Map函数可以多机分布式调用。输入段可以被多机并行处理。通过使用划分函数(如哈希后模R)划分中间键空间为R片(piece),来保证Reduce函数可以分布式调用。片数R和划分函数由用户指定。

图1展示了一个MapReduce操作在我们的实现上的整体运行流程。当用户程序调用MapReduce函数时,紧随其后将发生一系列操作(图1中的标号对应于下方列表的序号)。

alt

  1. 与用户程序一起执行的MapReduce库首先将输入文件分成M段,通常每段16MB~64MB(用户可以设置参数控制)。然后在集群电脑上启动程序的很多副本。
  2. 有一个特殊的程序--称为Master。剩下的是Woker(Master可以分配任务给Worker),现在有M个map任务和R个reduce任务需要被分配。Master会给每个空闲Worker分配一个map或reduce任务。
  3. 一个被分配map任务的Worker会读取对应输入段的内容。并分析出输入数据的键值对,将其传递给用户定义的Map函数。由Map函数生成中间键值对,并缓存在内存中。
  4. 每隔一段时间内存中的键值对会被写入到Woker对应的磁盘,这些磁盘被划分函数划分成R个区域。键值对在磁盘上的位置被传回给Master,Master负责将这些位置信息发送给执行reduce任务的Worker。
  5. 当一个reduce worker收到Master传来的数据位置时,它通过RPC的方式远程从执行map任务的worker磁盘中读取中间键值数据。当一个reduce工作机读取完成所有中间数据后,它会根据中间键对它们进行排序,所有具有相同键的记录会被组合在一起。如果中间数据的总量太大而不能放入内存中,Worker会使用外排序。
  6. reduce worker在排好序的中间数据上重复工作,遍历每个遇到的独立的中间键。它把键和对应的中间值集合传递给用户的Reduce函数。Reduce函数的输出被追加到当前reduce分区的最终的输出文件中。
  7. 当所有的map任务和reduce任务完成后,主机唤醒用户程序。此时,用户的MapReduce调用结果将返回到用户代码中。 成功结束后,mapreduce的执行输出会放入R个输出文件中(每个reduce任务一个,文件名由用户指定)。通常来说,用户不用把R个输出文件联合成一个文件 — 他们通常用这些文件作为另一个MapReduce调用的输入,或者把它们用于另一个可以处理多文件输入的分布式应用程序。

alt

3.2 Master数据结构

      Master维护一些数据结构。对于每一个map任务和reduce任务,它会存储任务的状态(空闲,执行中,完成),以及非空闲任务worker的标识。

      Master是从map任务向reduce任务传递中间文件位置信息的通道。因此,对每个完成的map任务,主机储存R个中间文件区域的位置和大小。当map任务完成时,Master就会收 到中间文件位置与大小信息的更新。信息会被逐个发送给有运行中reduce任务的Worker。

3.3 容错

      因为MapReduce库的设计理念是使用成百上千的计算机处理非常大体量的数据,所以必须拥有优雅的容错机制。

Worker失效

      Master会周期性地向每个Worker发送ping命令。如果在一定时间内没有收到Worker的回应,Master会把这个Worker标记为失效。每个由失效的Worker执行完毕的map任务都会被设置回未执行状态,因此可以被调度到其它Worker执行。类似的,每个由失效的Worker正在执行的map任务或reduce任务同样会被设置为未执行并可以被重新调度。

      由失效Worker完成的map任务将会被重新执行,因为它们的输出存储在失效Worker本地硬盘上且宕机后不能被访问。已经完成的reduce任务不需要重新执行,因为它们的输出存储在全局文件系统上。

      当一个map任务首次被workerA执行并在之后被workerB执行时(因为A失效),所有执行reduce任务的Worker会被提醒。每个执行reduce任务的Worker如果从workerA没有完全读取数据,会从workerB读取数据。

      MapReduce可以从大规模工作机失效中复原。比如说在一个MapReduce操作中,一个正在运行的集群网络中有80台机器在几分钟内都失效了。MapReduce主机可以直接重新执行由失效worker所执行的工作,继续推进进度,最终完成整个MapReduce操作。

Master失效

      可以使主机周期性存储如上方所述的Master数据结构的检查点。当主机宕机时,一个新的副本可以从上一个检查点的状态开始执行。然而,假定只有一个主机,那么它不太可能失效。因此我们的当前实现在主机失效时会中断MapReduce操作。客户在有需要时可以重设这个条件并重启MapReduce操作。

存在故障时的语义

      当用户提供的map和reduce操作对于输入数据是确定的,那么分布式产生的输出将与无错地顺序执行整个程序时相同。

      我们依赖自动提交map和reduce任务的输出来完成这个特性。每个运行中的任务将它的输出写入私有临时文件中。一个reduce任务产生一个这样的文件,一个map任务产生R个这样的文件(每个对应一个reduce任务)。当一个map任务完成后,worker向Master 发送一个消息,消息中包括有这R个临时文件的名字。如果主机收到一个已经完成的map任务发送的完成消息,它会将其忽略。否则,它在主机数据结构中记录这R个文件的名字。

      当一个reduce任务完成后,reduce工作机自动重命名它的临时输出文件到最终输出文件中。如果一个相同的reduce任务在多个计算机上执行,用一个最终输出文件会被多次重命名。我们依赖底层文件系统提供的自动重命名操作,去保证最终文件系统中对于每个reduce任务产生的数据恰好包含一份。

      我们绝大多数的map和reduce操作是确定性的,在这种情况下我们的语义与顺序执行时相同,这就使得程序员很容易去解释程序的表现。当map或reduce操作是非确定性的时,我们提供更弱但依然可解释的语义。在这种非确定性的操作中,一个独特reduce任务R1的输出与R1的顺序执行非确定性操作相同。然而,一个不同的reduce任务R2也许与另一个由非确定性程序顺序执行的输出相同。

      考虑map任务M和reduce任务R1、R2 。设e(Ri)表示Ri的执行(恰好只有一个这样的执行)。更弱的语义产生自e(R1)可能读取到M的一个执行所产生的输出,而e(R2)可能读取到由不同的MM执行所产生的输出。

3.4 局部性

      网络带宽在我们的计算环境中是相对缺乏的资源。由GFS[8] 管理的输入数据存储在集群计算机的本地磁盘上,我们利用这个优势来省下网络带宽。GFS把文件分成64MB的块,在不同的计算机上存储每个块的副本(通常会存储3个副本)。MapReduce主机提取输入文件的位置信息,然后尝试在包含输入数据副本的计算机上调度对应的map任务。当失败之后,会尝试在最近包含输入数据副本的计算机上调度这个map任务(比如切换到同一网络中包含数据的工作机)。当在一个集群中大部分工作机上运行大型MapReduce操作时,大部分输入数据被会原地读取,不用花费网络带宽。

3.5 任务粒度

      我们把map阶段和reduce阶段分别细分成如上所述的M部分和R部分。理想情况下,M和R应该比工作机的数量大得多。让每个工作机执行许多不同的任务可以提升动态负载均衡,也可以加速工作机失效后的恢复工作:很多map任务已经被通过其它工作机延展。

      在我们的实现中对于M和R的大小有实际性的界限,因为如上所述,主机必须进行O(M+R)次调度并且在内存中保持O(M∗R)个状态。(然而内存占用常数因子很小:O(M∗R)个状态每个只需要1byte数据,即每个map/reduce任务对)。

      此外,R经常被用户约束因为每个reduce任务输出在一个独立输出文件中。在实践中,我们倾向于选择M使得每个独立任务大约有16MB到64MB的输入数据(这样上面描述的原地性优化效果最好),我们让R为我们期望使用工作机数量的一个小倍数。我们通常设定MapReduce计算的M=200000,R=5000,使用2000台工作机。

alt

3.6 备份任务

       一个落后者是延长MapReduce操作时间的原因之一:在计算中一个计算机花费异常长的时间去完成最后几个map或reduce工作。落后者可以由一系列原因导致。比如说,一个硬盘质量很差的计算机可能经历频繁的可修正错误,将它的读取表现从30MB/s减缓到1MB/s。集群调度系统可能已经把其它的任务调度到这台机器上,导致它由于CPU、内存、本地硬盘、或者网络带宽的竞争使得MapReduce代码执行更慢。我们最近遇到的一个计算机初始化代码上的bug导致处理器缓存被关闭:在被影响的计算机上的运算变慢了一百倍。

      我们有一个通用机制去减轻落后者的问题。当MapReduce操作接近完成时,master调度备用进程来执行那些剩下的还在执行的任务。无论是基础任务完成还是副本任务完成都会标记任务完成。我们已经调整了这个机制,现在它通常只会增加百分之几的计算资源。我们已经发现它显著的减少了完成大规模MapReduce操作的时间。比如说,5.3节描述的排序程序在备份任务机制关闭时需要花额外44%的时间。

4. 细化

      即使基础由简单写作的Map和Reduce函数提供的功能可以适用于大部分的需求,我们已经发现一部分有用的扩展。将在这部分中描述这些扩展。

4.1 划分函数

      MapReduce的用户指定他们需要的reduce任务数/输出文件数(即R)。这些任务的数据通过一个中间键上的划分函数进行划分。已经提供的一个默认划分函数使用哈希(如hash(key) mod R)。这趋向于结果公平匀称的划分。然而在某些情况下,使用一些其它的键划分函数更有用。比如,有时输出键是URL,我们希望每个主机的所有入口最后放入一个输出文件中。为了支持像这样的情况,MapReduce库的用户可以提供一个特殊的划分函数。比如说,使用hash(Hostname(urlkey)) mod R作为划分函数可以使一个主机所有的URL放在同一个输出文件中。

4.2 顺序保证

      我们保证在一个给定的划分内,中间键值对会按键的升序进行处理。顺序的保证使得每个划分可以很轻松地生成一个有序的输出文件,这在输出文件格式需要用键进行高效随机存取查找,或者输出文件用户发现拥有有序的数据是一件很方便的事情时非常有用。

4.3 联合函数

      某些情况下,由每个map任务产生的中间键中有很多重复,而用户指定的Reduce函数满足结合律和交换律。一个形象的例子是2.1节中的单词统计。因为单词频率趋向于服从Zipf分布,每个map任务会产生成百上千拥有类似于<the,1>形式的记录。所有这些统计会被通过网络送到一个reduce任务上,然后被Reduce函数将它们联合起来计算出一个数字。我们允许用户指定一个可选的联合函数在数据被送到网络之前执行部分的合并工作。 【译注:Zipf分布是指单词出现频率(F)与出现频率排名®成反比,即F∗R=C,C为常数,类似于二八法则。】

      联合函数运行在每个执行map任务的计算机上。通常与reduce函数的代码相同。它们仅有的不同是MapReduce库如何抓取函数的输出。reduce函数的输出写入进最终的输出文件中。联合函数的输出写入将被送往reduce任务的中间文件中。

部分联合可以有效地加速某些MapReduce操作。附录A附录A包含了使用联合的例子。

4.4 输入和输出类型

      MapReduce库提供支持去读取一些不同格式的输入数据。比如说,“文本”模式的输入将每行处理为一个键值对:键是文件的偏移,值是行的内容。另一种常见的支持格式存储一系列按键排序的键值对。每种输入类型的实现都知道该如何把它们分割到可被当作独立map任务处理的有意义范围(比如,文本模式的的范围分割确保范围分割仅在每行出现一次)。用户可以通过实现一个简单的reader接口对新的输入类型添加支持,即使大部分用户只会使用少量预定义的输入类型中的一个。

      一个reader不用非要从文件中读入数据。比如说,可以很轻松地定义一个reader从数据库或映射在内存中的数据结构中读入记录。

      以类似地方式,我们支持一系列产生于不同方式中的输出数据格式,可以很轻松地使得用户代码对新的输出类型添加支持。

4.5 副作用

      在某些例子中,MapReduce的用户已经发现可以从他们的map或reduce操作中很方便地去产生辅助文件作为附加输出。我们依赖于应用程序写入使得这样的副作用自动且幂等。通常情况下应用程序写入一个临时文件,并且当它被完全生成后就会自动地重命名。

      我们不对由一个任务产生的多输出文件的自动两阶段提交提供支持。因此,产生具有跨文件一致性需求的多输出文件的任务应当是确定性的。这个限制在实践中从来不是一个问题。

未完待续。。。

扫鸭扫鸭,求关注

alt

本文链接:http://blog.keepting.cn/blog//post/mapReduce_desc.html

-- EOF --

Comments