MapReduce论文翻译
MapReduce: Simplied Data Processing on Large Clusters
摘要: MapReduce是一种编程模型和一个相关的实现,用于处理和生成大数据集。用户指定一个map函数,该函数处理一个键值对,然后生成一个一组中间键值对,然后指定一个reduce函数将所有键相同的中间键值对合并。许多真实世界的任务都可以用这种模型来表达,正如论文中展示的那样。
用这种函数式风格编写的程序会自动的并行化,并且在一个集群上执行。运行时系统负责对输入数据进行分区、调度一组机器上的程序的执行、处理机器故障并且管理内部机器之间的通信。这使得没有任何并行和分布式系统经验的编程人员也可以轻松地利用大型分布式系统的资源。
我们的MapReduce实现运行在一大群普通计算机组成的集群上并且高度可伸缩:一个典型的MapReduce计算可以在上千台机器上处理TB级别的数据。编程人员发现这种系统很容易使用:数百个MapReduce程序已经被实现并且每天有上千个MapReduce的任务在谷歌的集群上执行
简介
在过去的五年,作者和许多其它在谷歌的人实现了上百个专用的计算来处理大量的原始数据,例如爬取的文档、web请求日志等等。这些计算产生各种各样的派生数据,例如倒排索引、web文档的图结构的各种表示、每个网页有多少页、一天内最频繁被查询的集合等等。大部分这样的计算在概念上都是很简单的。但是它们的输入通常都很大,计算必须被分配到成百上千台机器上才能在一个合理的时间内完成。问题在于,如何并行计算,如何分布数据并且处理失败。这些问题让原本简单的计算变得很复杂,需要大量复杂的代码来解决这些问题。
为了应对这种复杂性,我们设计了一种新的抽象,它允许我们简单第表示计算而不用考虑细节,将并行化、错误容忍、数据分布以及负载均衡等混乱的细节封装在库中。我们的抽象受到了Lisp以及许多其它的函数式编程语言中存在的map和reduce原语的启发。我们发现,我们的大部分计算都涉及对输入中的每个逻辑“record”应用一个map操作,以计算生成一组中间的键值对,然后对于具有相同键的键值对应用一个reduce操作来将派生数据合适地组合到一起。我们使用一种用户指定map和reduce操作的函数模型,这允许我们很简单地并行化大量的计算并且使用重新执行作为错误容忍的主要机制。
这项工作的主要贡献是提供了一个简单和强大的接口,可以实现大规模的自动化并行和分布式处理,以及实现了这个接口,可以在由普通机器组成的大型集群上达到高性能。
第二节描述了基本的编程模型并且提供了几个例子。第三节描述了基于我们基于集群的计算环境的MapReduce接口的实现。第四节介绍了一些我们发现有用的对于编程模型的改进。第五节是对于我们的实现在执行一些不同的任务时的性能度量。第六节探索了一些MapReduce的应用,不仅仅是谷歌,也包括我们使用它作为重写我们生产系统索引的基础。第七节讨论了相关和未来的工作
编程模型
计算任务采用了一组包含键值对的输入,并且产生一系列键值对作为输出。MapReduce的使用者用两个函数表示计算:Map和Reduce
Map读取一个键值对作为输入,然后产生一组中间键值对。MapReduce库将所有中间键值对按照相同的键I分组并将它们传递给Reduce函数
Reduce函数接受一个键I和在所有中间键值对中这个键所对应的值的集合。它将这些值合并起来形成一个可能更小的值的集合。例如每次对于Reduce的调用只输出1和0作为值。中间值经过一个迭代器提供给用户reduce函数。这允许我们处理那些因为数量太多而不能存放在内存中的值。
例子
考虑统计一个大型文档集合中每个单词出现的数量。用户写出的代码可能类似于下列的伪代码
1 | map(String key, String value): |
map函数输出每个单词和这个单词出现的次数(在这个例子中是1)。reduce函数将一个单词的所有计数的和输出。
此外,用户还需要编写代码来填充一个MapReduce规范对象,其中包含输入和输出的文件名称以及可选的调优参数。然后用户调用MapReduce函数,将合规的对象传递给它。用户的代码和MapReduce的库链接在一起。附录A中包含了这个例子的完整代码实现。
类型
虽然前面的伪代码是按照字符串输入和输出来编写的,但是从概念上来说,用户提供的map和reduce函数有相关的类型:
1 | map (k1,v1) -> list(k2,v2) |
也就是说,输入的键和值可能是从与输出的键和值不同的域选取的。而且,中间键和值是从与输出的键和值相同的域中选取的。
我们的C++实现将字符串传递给要用户定义的函数并让用户代码在字符串和合适的类型之间进行转换。
更多例子
略:
- 分布式Grep
- URL访问频率计数
- 反转web链接图
- 每个主机的词向量
- 倒排索引
- 分布式排序
实现
许多对于MapReduce接口的不同的实现是可能的。正确的选择取决于环境。例如,一个实现可能更适合小共享内存的机器,另一个选择可能适合大型NUMA多处理器,还有一种适合更大的网络机器集群。
这个章节描述了一种实现,这种实现针对谷歌广泛使用的计算环境:普通PC用以太网交换设备连接起来的大型集群。在我们的环境中:
- 机器通常有两个x68处理器运行在Linux上,每台机器有2-4GB内存
- 使用普通的网络硬件,通常是每台机器100MB/s或者1GB/s的速度,但是平均值比在总对分带宽里低很多
- 一个集群由成百上千的机器组成,因此机器故障是很正常的
- 存储又便宜的IDE磁盘提供并且直接分配到单独的机器上。一个内部开发的分布式文件系统用来管理这些磁盘上的数据。这个文件系统用复制来在不可靠的硬件上提供可用性和可靠性
- 用户提交作业给调度系统。每个作业又一系列任务组成,并由调度器映射到一组可用的集群上
执行概括
图1:总体流程
Map的调用通过自动将输入数据分为M份从而分布在不同的机器上执行。输入的分片可以在不同的机器上被并行处理。Reduce的调用通过使用分区函数将中间键空间划分为R个部分来实现分布式。分区的数量R和分区函数都由用户指定。
图1显示了我们实现的MapReduce操作的整体流程。当我们的程序调用MapReduce函数,会发生以下一系列的行为(图1中的数字标签对应着下列的数字标签)
- 用户程序中的MapReduce库首先将输入的文件分为M个分片,一般来说16MB到64MB每份(由一个用户可选的参数控制)。然后它会在集群中的机器上启动这个程序的多个副本
- 其中一份副本程序是特殊的,它作为master。master给剩余的workers分配任务。master挑选workers然后分配给它们一个map任务或者reduce任务
- 一个worker如果被分配了map任务,它会从相应的输入分片中读取内容。它从输入的数据中解析出键值对然后将每个键值对传递给用户定义的Map函数。中间键值对由Map函数产生并且缓存在内存中
- 每隔一段时间,缓存的中间键值对会被写入到本地磁盘中,被分区函数分为R个区域。这些缓存的键值对在本地磁盘中存储的位置会被传递给master,master负责将这些位置告诉分配了reduce的worker
- 当一个运行reduce的worker被master告知这些缓存的存储位置,它会使用远程过程调用来从map workers的本地磁盘中读取缓存的数据。当一个reduce worker读取完了所有的中间数据,它会按照中间键的值来对它们进行排序,以便于所有出现的相同的键可以被分成一组。排序通常是被需要的,因为一般来说许多不同的键映射到同一个reduce任务。如果中间数据的数量多到不能存到内存中,就需要使用外部排序
- reduce worker遍历排序后的数据,并且对于遇到的每一个独特的中间键,它会将这个键和对应的中间值的集合传递给用户的Reduce函数。Reduce函数的输出被想回家到这个reduce分区最终的输出文件中
- 当所有的map任务和reduce任务都完成了,master会唤醒用户程序。此时,用户程序中的MapReduce调用执行返回到用户代码中
在成功的完成之后,MapReduce执行的输出就保存在这R个输出文件中(一个文件对一个一个reduce任务,文件名由用户指定)。一般来说,用户不需要将这R个输出文件合并为一个输出文件——他们通常将这些文件又作为其它MapReduce调用的输入,或者在其它能够处理多个文件输入的分布式系统中使用
Master数据结构
Master维护了几个数据结构。对于每个map任务和reduce任务,master储存了它们的数据结构(空闲,进行中或完成),以及每个worker机器的标识(对于非空闲的任务)
master是一个通道,中间文件域的地址信息经过它从map任务传递到reduce任务。因此,对于每个完成的任务,master储存了地址信息和这R个由map任务产生的中间文件域的大小。当map任务完成时,地址的更新和信息的大小会被主节点接收。这些信息会增量地推送给有进行中的reduce任务的worker节点
错误容忍
因为MapReduce库设计的主要目的是帮助处理使用成百上千的机器处理非常大量的数据,所以它必须能够优雅的容忍机器的错误
Workder错误
master周期性的ping worker节点。如果worker没有在一个确定的时间内响应,master将会把这个worker标记为失败。任何由这个worker完成的map任务都会被重置到它们初始的空闲状态,因此任务可以被调度到其它worker节点上。同样的,任何在失败worker节点上运行的map任务和reduce任务也都会被重置为空闲状态并且可以被重新调度
完成的map任务在发生故障时需要重新执行,因为他们的输出被排序在失败机器的本地磁盘上,因此不可访问。完成的reduce任务不需要重新执行因为它们的输出被排序在全局文件系统中
当一个map任务第一次被worker A执行之后又被worker B执行(因为A失败了),所有执行reduce任务的worker节点都会被通知重新执行。任何还没有读取worker A的数据的reduce任务都会重新从worker B读取
MapReduce能够抵抗大规模的worker错误。例如,在一个MapReduce操作进行期间,由于正在运行的集群进行了网络维护,导致80个机器同时变得在几分钟内无法访问。MapReduce的master简单的重新执行无法访问的节点所做的工作,并且继续执行程序,最终完成MapReduce操作
Master错误
定期的将上面描述的master数据结构写入检查点是很容易实现的。如果主节点的任务失败,可以从最新的检查点状态启动一个新的副本。然而,由于只有一个master,它的失败时不太可能的,因此我们的实现是如果master失败就中止MapReduce的计算。客户端可以检查这种情况,如果他们愿意的话还可以重新尝试这种操作
在故障发生时的语义
当用户提供的map和reduce操作是他们提供的值的确定性函数时,我们的分布式实现产生的输出和一个没有故障的顺序执行的结果是一样的
我们依赖于map和reduce任务的输出的原子性提交来实现这一特性。每一个在运行的任务都会将它的输出写入到一个私有的临时性文件中。一个reduce任务产生一个这样的文件,一个map任务产生R个这样的文件(每个reduce任务一个)。当一个map任务完成的时候,这个worker会发送一条信息给master,这里面包含了这R个临时性文件的名字。如果master已经收到过一个已经完成的map任务的完成信息,它会忽略这条信息。斗则,它会在一个master数据结构中记录这R个文件的名字
当一个reduce任务完成,这个reduce的worker会原子性的将它的临时输出文件重命名为最终输出文件。如果同样的reduce任务在多个机器上执行,多个重命名的请求将会在多个机器上执行。我么依赖底层文件系统提供的原子性重命名来保证最终文件系统状态只包含一个reduce任务执行产生的数据
我们的绝大部分map和reduce操作都是确定性的,并且事实上我们的语义等价于一个顺序执行,这使得程序员能够很容易理解他们的程序的行为。当一个map和/或reduce操作是非确定性的,我们提供了较弱但依然合理的语义。在非确定性操作存在时,一个特定的reduce任务R1的输出等价于由非确定性程序程序执行产生R1的输出。然而,一个不同的reduce任务R2的输出可能符合一个不同的非确定性程序顺序执行产生R2的输出
考虑map任务M和reduece任务R1以及R2。让$e(R_i)$代表$R_i$的个已经提交的执行(有且只有一个这样的执行)。较弱的语义的产生是因为$e(R_1)$也许已经读取到了M的一个执行产生的输出并且$e(R_2)$也许读到了一个M的不同的执行产生的输出
局部性
在我们的计算环境中,网络带宽是一个相对稀缺的资源。我们通过利用输入数据(被GFS管理)是存储在组成我们的集群的机器的本地磁盘上的这个事实来节约网络带宽。GFS将每个文件分成64MB大小的块,然后在不同的机器上存储每个块的几个副本(通常是3个)。MapReduce的master考虑到了输入文件的位置信息并且尝试讲一个map任务调度到一个包含任务对应的输入数据的副本的机器上。如果失败的话,它会尝试调度map任务到靠近该任务输入数据备份的地方(例如,在于包含数据的机器相同的交换机上的woker机器)。当在一个集群中的大量worker上运行大型MapReduce操作时大部分数据都是从本地读取的并且不消耗网络带宽
任务粒度
我们将map阶段分为M个部分,将reduce阶段分为R个部分,如上所述。理想情况下,M和R应该比worker节点的数量大很多。让每个节点执行许多不同的任务可以提高动态负载均衡,也可以加快节点失败时的恢复速度:许多它已经完成的map任务可以分散到其它所有节点上
在我们的实现中,M和R的大小有一些实际的限制,因为master必须作出O(M+R)个调度决策并在内存中保持O(MR)个状态,就像之前描述的那样。(但是内存使用的常数因子很小:O(M*R)个部分的状态大约由每对map任务/reduce任务一个字节的数据来组成)
此外,R通常被用户限制,因为每个reduce任务的输出都会在一个单独的文件中。在实践中,我们倾向于选择M,使得每个单独的任务大约有16MB到64MB的输入数据(以便于上述描述的局部性优化可以得到最大化),并给我们让R成为一个我们期望使用的worker节点数量的较小倍数。我们通常使用M=200,000,R=5,000,来执行MapReduce计算,使用2,000个worker节点
备份任务
一个常见的导致MapReduce操作的总时间延长的原因是“straggler”:一台机器花费异常长的时间来完成计算中最后几个计算中的map或reduce任务。Straggler可能由很多原因造成。例如,一个机器使用不好的磁盘,也许会经历频繁的可纠正错误,这会导致它的读取性能从30MB/s降低到1MB/s。集群调度系统可能调度其它任务到这个机器上,产生CPU、内存、本地磁盘或者是网络带宽的竞争,导致它执行MapReduce代码更慢。我们最近经历的一个问题是一个机器初始化代码的bug,它导致处理器缓存被禁用:在受影响机器上的计算下降了超过一百倍
我们有一般性的方法来环节straggler的问题。当一个MapReduce操作接近完成,master会安排剩余运行中任务的备份执行。主要任务或者是备份任务最后弄得一个完成了,任务就会被标记为完成。我们已经调整了这个机制,使得它通常只增加不超过几个百分点的计算资源的使用。我们发现这显著的减少了大型的MapReduce操作完成的时间。例如:5.3章节中描述的排序任务花费多44%的时间来完成相比于使用了备份任务方法的情况
优化
尽管简单地编写Map和Reduce函数就可以满足大部分需要的基本功能,但是我们发现一些扩展很有用。这些扩展在本节描述。
分区函数
MapReduce的用户制定他们需要的reduce任务和输出文件的数量(R)。数据根据中间键上的分区函数分配到这些任务中。提供了一个默认的分区函数,它使用哈希分区(例如:hash(key) mod R
)。这倾向于产生相当平衡的分区。但是在一些情况下,根据键的一些其它函数来进行分区数据是很有用的。例如,有些时候输出是URLs,我们想要同一个域名的所有条目出现在同一个文件夹里。为了支持这样的情形,MapReduce库的用户可以提供一个特殊的分区函数。例如,使用”hash(Hostname(urlkey)) mod R
“作为分区函数会使得所有的来自同一个域名的URL最初出现在一个同样的输出文件里
排序保证
我们保证在一个给定的分区内,中间键值对按照升序处理。这个顺序保证使得每个分区产生一个排序好的输出文件变得简单,这会在输出文件格式需要支持根据key来进行随机访问时很有用,或者用户发现对输出的数据进行排序很简单
合并函数
在某些情况下,每一个map任务产生的中间键具有很多重复,并且用户指定的Reduce函数是可交换和可结合的。一个很好的例子是2.1章节提到的单词计数。由于单词频率倾向于Zipf分布,每一个map任务将会产生成百上千的类似于<the,1>
的形式的记录。所有这些技术都会被通过网络发送给一个reduce任务,然后被Reduce函数累加到一起产生一个总数。我们允许用户指定一个可选的合并哈数来在网络发送前对数据进行部分合并
合并函数在每一个执行map任务的worker节点上执行。通常使用相同的代码来实现合并函数和reduce函数。reduce函数和合并函数唯一的不同是MapReduce如何处理函数的输出。reduce函数的输出被写入到最终输出文件中,合并函数的输出被写入到一个将要发送给reduce任务的中间文件中
部分合并函数显著地加速了某些类别的MapReduce操作。附录A包含一个使用合并函数的例子
输入和输出类型
MapReduce库提供对于读取多种不同格式的输入数据的支持。例如,文本类型的数据将每一行视为一个键值对:键是在文件中的偏移量,值是这一行的内容。另一个常见的支持格式按键排序存储一个键值对的序列。每一个输入类型的实现都知道如何将自己分割成有意义的范围,以便于被单独的map任务处理(例如:文本模式的范围分割确保了范围分割仅仅发生在行的边界上)。用户可以通过提供一个简单的reader
接口的实现来添加对于一个新的输入类型的支持,尽管大部分用户只使用一小部分预定义的输入类型。
reader
不一定要提供来自文件的数据读取。例如,也可以简单的顶一个一个reader来读取数据库中的记录,或者是从内存中映射的数据结构中读取数据
类似的,我们支持一组输出类型,可以以不同的格式产生数据,并且用户代码可以很容易的添加新的输出类型
副作用
在一些情况下,MapReduce的用户发现可以很方便地产生备用文件作为他们的map或者reduce操作的额外输出。我们依赖应用编写者来实现这样的副作用的原子性和幂等性。通常应用程序写入一个临时文件并且在文件完全生成后原子性地重命名这个文件
我们不提供对于不提供对于单个任务产生的多个输出文件的原子性的两阶段提交的支持。因此,产生多个具有跨文件一致性要求的输出文件的任务应该是确定性的。这个限制在实践中从未成为一个问题
跳过不好的记录
有时候用户代码代码中会出现一些bug,它们会导致Map和Reduce函数在某些记录上确定性的崩溃。这样的bug阻止了MapReduce操作的完成。通常的做法是修复这个bug,但有时这并不可行;可能这个bug在第三方库中,源代码无法获取。此外,有时候忽略一些记录是可以接受的,例如在一个巨大的数据集上做统计分析。我们提供一个可选的执行模式,在这个模式下MapReduce库会检测哪一个记录引起了确定性崩溃,并且跳过这些记录以便于继续执行
每一个worker进程安装一个信号处理器,用来捕获段错误和总线错误。在调用一个用户的Map和Reduce操作之前,MapReduce库将参数的序列号存在一个全局变量中,如果用户代码产生一个信号,这个信号处理发送一个“last gasp”的UDP包给MapReduce的master,包中包含着序列号。当master在一个特定的记录上观察到超过一个错误,它会在下一次重新执行相应的Map和Reduce任务时指示这个记录应该被跳过
本地执行
在Map和Reduce函数中调试问题是棘手的,因为计算发生在一个分布式系统上,通常在几千台机器上,并且工作分配由master动态决定。为了帮助调试、性能分析和小规模的测试,我们开发了一个MapReduce库的替代实现,它可以在本地机器上顺序执行一个MapReduce操作的所有工作。用户可以控制计算使得计算限制在一个特定的map任务上。用户用一个特殊的标志调用他们的程序并且可以很容易的使用任何他们觉得有效的调试和测试工具(例如gdb)
状态信息
master运行一个内部的HTTP服务并且提供了一些列的状态页面供用户查看,状态页面显示了计算的进度,例如多少任务已经被完成,多少还在执行中国,输入的字节数,中间数据的字节数,输出的字节数,处理速率等等。这个页面也包含到每个任务产生的标准错误和标准输出文件的链接。用户可以使用这些数据预测计算需要多长时间,是否需要增加计算资源。这些页面也可以被用来指出计算比预期要慢的原因
此外,顶级状态页面显示了哪一个worker节点失败了,它们失败的时候正在执行哪一个map和reduce任务。这个信息在我们尝试在用户代码中诊断错误的时候很有用
计数
MapReduce库提供了以一个计数器来统计各种事件发生的次数。例如,用户代码也许想要统计被处理的单词的综述或者是索引的德语文档的数量等等
要是用这个功能,用户代码创建一个名为counter的对象然后在Map和Reduce中适当的增加计数器的值,例如(统计大写单词的数量):
1 | Counter* uppercase; |
来自单独的worker节点的计数器的值周期性的传递给master(附加在ping响应上)。在MapReduce操作完成时,master将来自成功的map和reduce任务的技术器的值进行汇总然后返回给用户代码。当前的计数器的值也被展示在master的状态页上,以便于人们可以观察实时计算的进度。当汇总计数器的值的时候,master消除同样的map和reduce任务重复执行的影响来避免重复计数。(重复执行可能发生在我们使用备份任务以及任务由于失败重复执行的时候)
一些计数器的值由MapReduce库自动保存,例如处理的输入键值对的数量,产生的输出键值对的数量
用户发现计数器功能对于检查MapReduce操作的行为是很有用的。例如,在一些MapReduce操作中,用户代码也许希望确保产生的输出键值对的数量等于输入键值对的数量,或者处理的德语文件数量占总文件数量的比例在一定范围之类
总结
MapReduce编程模型已经成功地在谷歌使用,用于许多不同的目的。我们将成功归结于几个原因。第一,这个模型易于使用,甚至没有并发和分布式系统经验的程序员也可以使用,因为它隐藏了分布式、错误容忍、局部优化和负载均衡的细节。第二,许多不同的问题都可以很容易的被MapReduce计算表达。例如,MapReduce被用于收集谷歌的web搜索服务产生的数据,被用于排序、数据压缩、机器学习和许多其它系统。第三,我们开发了一个MapReduce实现可以伸缩到上千台机器组成的集群。这个实现有效的使用了这些机器的资源并且因素很适合使用在谷歌遇到的许多大型计算问题上
我们已经学到了这项工作的几件事。首先,限制编程模型是并行化和分布式计算变得简单,也易于对这样的计算进行错误容忍。第二,网络带宽是一个稀缺的资源。因此我们系统中的许多优化目的都是减少通过网络发送的数据的数量:局部优化允许我们从本地磁盘读取数据,然后在本地底盘写入中间数据的单个副本来节省网络带宽。第三,冗余执行可以被用来减少运行缓慢的机器的影响,解决机器的失败和数据丢失