小米组件自定义大小(EMRRemoteShuffle)
作者 | 一锤、明济、紫槿来源 | 阿里技术公众号
阿里云EMR自2020年推出Remote Shuffle Service(RSS)以来,帮助了诸多客户解决Spark作业的性能、稳定性问题,并使得存算分离架构得以实施,与此同时RSS也在跟合作方小米的共建下不断演进。本文将介绍RSS的最新架构,在小米的实践,以及开源。
一 问题回顾Shuffle是大数据计算中最为重要的算子。首先,覆盖率高,超过50%的作业都包含至少一个Shuffle[2]。其次,资源消耗大,阿里内部平台Shuffle的CPU占比超过20%,LinkedIn内部Shuffle Read导致的资源浪费高达15%[1],单Shuffle数据量超100T[2]。第三,不稳定,硬件资源的稳定性CPU>内存>磁盘≈网络,而Shuffle的资源消耗是倒序。OutOfMemory和Fetch Failure可能是Spark作业最常见的两种错误,前者可以通过调参解决,而后者需要系统性重构Shuffle。
传统Shuffle如下图所示,Mapper把Shuffle数据按PartitionId排序写盘后交给External Shuffle Service(ESS)管理,Reducer从每个Mapper Output中读取属于自己的Block。
传统Shuffle存在以下问题。
- 本地盘依赖限制了存算分离。存算分离是近年来兴起的新型架构,它解耦了计算和存储,可以更灵活地做机型设计:计算节点强CPU弱磁盘,存储节点强磁盘强网络弱CPU。计算节点无状态,可根据负载弹性伸缩。存储端,随着对象存储(OSS, S3) 数据湖格式(Delta, Iceberg, Hudi) 本地/近地缓存等方案的成熟,可当作容量无限的存储服务。用户通过计算弹性 存储按量付费获得成本节约。然而,Shuffle对本地盘的依赖限制了存算分离。
- 写放大。当Mapper Output数据量超过内存时触发外排,从而引入额外磁盘IO。
- 大量随机读。Mapper Output属于某个Reducer的数据量很小,如Output 128M,Reducer并发2000,则每个Reducer只读64K,从而导致大量小粒度随机读。对于HDD,随机读性能极差;对于SSD,会快速消耗SSD寿命。
- 高网络连接数,导致线程池消耗过多CPU,带来性能和稳定性问题。
- Shuffle数据单副本,大规模集群场景坏盘/坏节点很普遍,Shuffle数据丢失引发的Stage重算带来性能和稳定性问题。
针对Shuffle的问题,工业界尝试了各种方法,近两年逐渐收敛到Push Shuffle的方案。
1 SailfishSailfish3最早提出Push Shuffle Partition数据聚合的方法,对大作业有20%-5倍的性能提升。Sailfish魔改了分布式文件系统KFS[4],不支持多副本。
2 DataflowGoolge BigQuery和Cloud Dataflow5实现了Shuffle跟计算的解耦,采用多层存储(内存 磁盘),除此之外没有披露更多技术细节。
3 RiffleFacebook Riffle2采用了在Mapper端Merge的方法,物理节点上部署的Riffle服务负责把此节点上的Shuffle数据按照PartitionId做Merge,从而一定程度把小粒度的随机读合并成较大粒度。
4 CoscoFacebook Cosco[6]7采用了Sailfish的方法并做了重设计,保留了Push Shuffle Parititon数据聚合的核心方法,但使用了独立服务。服务端采用Master-Worker架构,使用内存两副本,用DFS做持久化。Cosco基本上定义了RSS的标准架构,但受到DFS的拖累,性能上并没有显著提升。
5 ZeusUber Zeus[8]9同样采用了去中心化的服务架构,但没有类似etcd的角色维护Worker状态,因此难以做状态管理。Zeus通过Client双推的方式做多副本,采用本地存储。
6 RPMPIntel RPMP10依靠RDMA和PMEM的新硬件来加速Shuffle,并没有做数据聚合。
7 MagnetLinkedIn Magnet1融合了本地Shuffle Push Shuffle,其设计哲学是"尽力而为",Mapper的Output写完本地后,Push线程会把数据推给远端的ESS做聚合,且不保证所有数据都会聚合。受益于本地Shuffle,Magnet在容错和AE的支持上的表现更好(直接FallbACK到传统Shuffle)。Magnet的局限包括依赖本地盘,不支持存算分离;数据合并依赖ESS,对NodeManager造成额外压力;Shuffle Write同时写本地和远端,性能达不到最优。Magnet方案已经被Apache Spark接纳,成为默认的开源方案。
8 FireStormFireStorm11混合了Cosco和Zeus的设计,服务端采用Master-Worker架构,通过Client多写实现多副本。FireStorm使用了本地盘 对象存储的多层存储,采用较大的PushBlock(默认3M)。FireStorm在存储端保留了PushBlock的元信息,并记录在索引文件中。FireStorm的Client缓存数据的内存由Spark MemoryManager进行管理,并通过细颗粒度的内存分配(默认3K)来尽量避免内存浪费。
从上述描述可知,当前的方案基本收敛到Push Shuffle,但在一些关键设计上的选择各家不尽相同,主要体现在:
- 集成到Spark内部还是独立服务。
- RSS服务侧架构,选项包括:Master-Worker,含轻量级状态管理的去中心化,完全去中心化。
- Shuffle数据的存储,选项包括:内存,本地盘,DFS,对象存储。
- 多副本的实现,选项包括:Client多推,服务端做Replication。
阿里云RSS12由2020年推出,核心设计参考了Sailfish和Cosco,并且在架构和实现层面做了改良,下文将详细介绍。
三 阿里云RSS核心架构针对上一节的关键设计,阿里云RSS的选择如下:
- 独立服务。考虑到将RSS集成到Spark内部无法满足存算分离架构,阿里云RSS将作为独立服务提供Shuffle服务。
- Master-Worker架构。通过Master节点做服务状态管理非常必要,基于etcd的状态状态管理能力受限。
- 多种存储方式。目前支持本地盘/DFS等存储方式,主打本地盘,将来会往分层存储方向发展。
- 服务端做Replication。Client多推会额外消耗计算节点的网络和计算资源,在独立部署或者服务化的场景下对计算集群不友好。
下图展示了阿里云RSS的关键架构,包含Client(RSS Client, Meta Service),Master(Resource Manager)和Worker三个角色。Shuffle的过程如下:
- Mapper在首次PushData时请求Master分配Worker资源,Worker记录自己所需要服务的Partition列表。
- Mapper把Shuffle数据缓存到内存,超过阈值时触发Push。
- 隶属同个Partition的数据被Push到同一个Worker做合并,主Worker内存接收到数据后立即向从Worker发起Replication,数据达成内存两副本后即向Client发送ACK,Flusher后台线程负责刷盘。
- Mapper Stage运行结束,MetaService向Worker发起CommitFiles命令,把残留在内存的数据全部刷盘并返回文件列表。
- Reducer从对应的文件列表中读取Shuffle数据。
点击链接查看原文阿里云EMR Remote Shuffle Service在小米的实践,关注公众号【阿里技术】获取更多福利!
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com