MIT 6.824 分布式系统基础学习(LEC 1-3)

LEC 1 Introduction and MapReduce

LEC 2 RPC and Threads

LEC 3 Google File System


LEC 1 Introduction

1.1 Drivens and Challenges

提醒:设计一个系统的时候,如果可以用其他方式解决,就不要考虑分布式,因为分布式会让问题变得复杂。

分布式系统的核心是通过网络协调,共同完成一致任务的一些计算机。使用分布式的驱动力是:

  • 并行parallelism。需要更高的性能,就需要更多的计算机,更多的计算机意味着硬件并行运行和计算的并行执行。
  • 容错tolerate faults。一台计算机出错,可以换到另一台。
  • 物理上的分布physical。有些场景有着天然的分布,例如一家银行在其他地方有分行,需要两个地方的计算机有协调的方法。
  • 安全security/isolated。有一些代码不被信任,让他的代码运行到一个服务器上,我的代码运行到另一个服务器上,然后通过网络通信,限制出错的范围。

这门课主要讨论两点:容错和性能。剩下的两点通过某些案例学习。

分布式系统的挑战在于:

  • 并行设计concurrency。并发编程本身是复杂的,加上各种复杂交互带来的问题,以及时间依赖问题,比如同步、异步。
  • 局部出错partial failure。分布式系统有很多组成部分,这些部分可能有一部分在运行,另一部分出错停止运行,加上网络通信的不可预期性。
  • 性能分析performance。一千台计算机的性能就是一台计算机的一千倍吗?怎样设计才能达到预期的性能?都是很棘手的问题。

1.2 Course Structure

授课内容会围绕分布式系统的两个方面(性能和容错)。有几节课会介绍一些关于编程实验的内容。

四次实验:MapReduce, Raft for fault tolerance, K/V server, Sharded K/V service。

1.3 Abstraction and Implementation

这门课是有关应用的基础架构的,基础架构的类型主要是存储,通信(网络)和计算。实际上最关注的是存储,去构造一个多副本的、容错的、高性能的分布式存储实现。也会讨论一些计算系统,例如MapReduce。也会讨论关于通信的问题,这是建立分布式需要的工具。

对于存储和计算,我们的目标是为了能够设计一些简单接口,让第三方应用能够使用这些分布式的存储和计算,这样才能简单的在这些基础架构之上,构建第三方应用程序。人们在构建分布系统时,使用了很多的工具,例如:

  • RPC(Remote Procedure Call)。RPC的目标就是掩盖我们正在不可靠网络上通信的事实。
  • 线程。这是一种编程技术,使得我们可以利用多核心计算机。对于本课程而言,更重要的是,线程提供了一种结构化的并发操作方式,这样,从程序员角度来说可以简化并发操作。
  • 并发。因为我们会经常用到线程,我们需要在实现的层面上,花费一定的时间来考虑并发控制,比如锁。

1.4 Scalability(or Performance)

Scalability的中文翻译是可扩展性,但是也可以翻译为可测量性(我的理解:已知两台计算机,可得到两倍的性能,这就是可测量性,如果不知道得到的性能提升了多少,则为不可测量性)。

构建分布式系统的目的是为了获得可扩展的加速,即追求可扩展性。这里的可扩展性的意思是,当使用一台计算机可以解决的问题,当买了第二台计算机的时候,只需要一半时间就可以解决问题,或者能解决两倍数量的问题。两台计算机如果有两倍的性能,就是我们说的可扩展性。

如果构造一个系统,只要增加计算机的数量,就能获得相应的提升的话,是巨大的成功。因为计算机只要用钱就能够买到,如果不增加计算机的数量,只能花钱雇程序员来重构这些系统或者算法,以此获得更高的性能,但这通常是昂贵的方法。

我们还是希望增加一百倍的计算机,来获得一百倍的性能,所以必须时刻考虑可扩展性,仔细设计系统,才能获得与计算机数量匹配的性能。

当访问你的网站的人数突然增加,你可以花费时间去优化你的网站,但是显然没有哪个时间。所以,为了提高性能,你要做的第一件事就是购买更多的计算机作为服务器。这些服务器都与后端的数据库通信,所以你在很长的时间都可以通过购买服务器来提高效率。

但是这种可扩展性不是无限的,因为它们都在跟同一个数据库通信,现在数据库成为了新的瓶颈。所以很少有通过无限增加计算机就能获得可扩展性的场景,因为当数量到达某个临界点的时候,再往系统中添加数据将解决不了问题。

所以,扩展性的是这样的:我们希望通过增加计算机的方法来实现扩展,但是现实中很难实现,需要一些架构设计来让这个可扩展性无限扩展下去。

1.5 Availability and Recoveability(or Fault Tolerance)

通过数千台计算机来构建一个系统,即使每台计算机能够稳定运行一年的时间,每天也有三台计算机出现故障。所以在大型的系统中,一个很罕见的问题都会被放大,将一个之前本不需要考虑的问题,变成一个持续不断的问题。因为错误总会发生,所以需要在设计就考虑系统能够屏蔽错误,或者说在出现错误的时候依旧能够运行。同时也需要为应用开发者提供方便的抽象接口,对应用开发人员屏蔽和掩盖错误。

对于容错,有很多不同的概念可以表述,这些表述中,有一个共同的思想就是可用性。某些系统经过精心的设计,在特定的错误下,依旧可以正常运行,好像没有出现错误一样,提供完整的服务。可用性是一个范围,在这个范围内,系统能够提供服务,但是如果出现更多的错误,超出了这个范围,系统就失去了可用性。

除了可用性,另一种容错的机制是自我恢复性。如果出现了问题,服务会停止工作,等待人来修复,在修复之后,系统仍然可以正常运行,就像没有出现过问题一样。这是一个比可用性更弱的需求,以为出现故障之后到修复之前,系统都是停止工作的。但是在修复之后,系统又可以运行,所以也是一个重要的需求。为了实现可恢复性,有很多的工具,最重要的有两个:

  • 非易损失存储non-volatile storage。当出现类似电源故障,可以使用非易失存储,比如硬盘,闪存,SSD之类的。我们可以存放一些check point或者系统状态的log在这些存储中,这样当备用电源恢复或者某人修好了电力供给,我们还是可以从硬盘中读出系统最新的状态,并从那个状态继续运行。所以,这里的一个工具是非易失存储。因为更新非易失存储是代价很高的操作,所以相应的出现了很多非易失存储的管理工具。同时构建一个高性能,容错的系统,聪明的做法是避免频繁的写入非易失存储。在过去,甚至对于今天的一个3GHZ的处理器,写入一个非易失存储意味着移动磁盘臂并等待磁碟旋转,这两个过程都非常缓慢。有了闪存会好很多,但是为了获取好的性能,仍然需要许多思考。
  • 复制replication。不过,管理复制的多副本系统会有些棘手。任何一个多副本系统中,都会有一个关键的问题,比如说,我们有两台服务器,它们本来应该是有着相同的系统状态,现在的关键问题在于,这两个副本总是会意外的偏离同步的状态,而不再互为副本。对于任何一种使用复制实现容错的系统,我们都面临这个问题。lab2和lab3都是通过管理多副本来实现容错的系统,你将会看到这里究竟有多复杂。

1.6 Consistency

当需要对一个分布式系统举例时,我总是会想到KV服务。这个KV服务只支持两种操作,其中一个是put操作会将一个value存入一个key;另一个是get操作会取出key对应的value。整体表现就像是一个大的key-value表单。如果是程序员,可以到手册去查这两种操作会有什么效果,否则就无法知道这两种操作的效果,更何况去编写程序。

一致性就是用来定义操作行为的概念,从容错和性能的角度来说,一般会有多个副本,数据可能存在多个表单中,于是有了多个不同版本的KV对,这个时候去get可能得到不同的value。有时候获取的是同一个副本里的数据,这个时候是一致的,但是有一天这个副本出错了,一致性的问题就会暴露出来。

对于一致性有不同的定义。强一致性strong consistency,get获取的是最近一次完成put写入的值。弱一致性,不保证get获取的是最新的值。

强一致性会带来昂贵的通信问题,为了保证每一次都获取最新的值,分布式系统的各个组件都需要频繁地和其他副本通信,除此之外,假设有部分出现故障了,这个时候就无法确定这个部分是不是最新的。强一致性会带来这样的困境:容错->复制(副本),并把副本放到足够远的地方->通信代价变高,性能变低。

所以更倾向于选择弱一致性的系统。当然,为了让弱一致性可以更有实际意义,人们还会定义更多地规则。你只需要更新最近的数据副本,并且只需要从最近的副本获取数据。

1.7 MapReduce Basic Work Mode

MapReduce是由Google设计,开发和使用的一个系统,相关的论文在2004年发表。Google当时面临的问题是,他们需要在TB级别的数据上进行大量的计算。所以Google需要一种框架,可以让它的工程师能够进行任意的数据分析,例如排序,网络索引器,链接分析器以及任何的运算。

MapReduce的思想是,应用程序设计人员和分布式运算的使用者,只需要写简单的Map函数和Reduce函数,而不需要知道任何有关分布式的事情,MapReduce框架会处理剩下的事情。

抽象来看,MapReduce假设有一些输入,这些输入被分割成大量的不同的文件或者数据块。MapReduce启动时,会查找Map函数。之后,MapReduce框架会为每个输入文件运行Map函数。

Map函数以文件作为输入,文件又是整个输入数据的一部分。Map函数的输出是一个key-value对的列表。假设我们在实现一个最简单的MapReduce Job:单词计数器。它会统计每个单词出现的次数。在这个例子中,Map函数会输出key-value对,其中key是单词,而value是1。Map函数会将输入中的每个单词拆分,并输出一个key-value对,key是该单词,value是1。最后需要对所有的key-value进行计数,以获得最终的输出。所以,假设输入文件1包含了单词a和单词b,Map函数的输出将会是key=a,value=1和key=b,value=1。第二个Map函数只从输入文件2看到了b,那么输出将会是key=b,value=1。第三个输入文件有一个a和一个c。

我们对所有的输入文件都运行了Map函数,并得到了论文中称之为中间输出(intermediate output),也就是每个Map函数输出的key-value对。

运算的第二阶段是运行Reduce函数。MapReduce框架会收集所有Map函数输出的每一个单词的统计。比如说,MapReduce框架会先收集每一个Map函数输出的key为a的key-value对。收集了之后,会将它们提交给Reduce函数。

之后会收集所有的b。这里的收集是真正意义上的收集,因为b是由不同计算机上的不同Map函数生成,所以不仅仅是数据从一台计算机移动到另一台(如果Map只在一台计算机的一个实例里,可以直接通过一个RPC将数据从Map移到Reduce)。我们收集所有的b,并将它们提交给另一个Reduce函数。这个Reduce函数的入参是所有的key为b的key-value对。对c也是一样。所以,MapReduce框架会为所有Map函数输出的每一个key,调用一次Reduce函数。

在我们这个简单的单词计数器的例子中,Reduce函数只需要统计传入参数的长度,甚至都不用查看传入参数的具体内容,因为每一个传入参数代表对单词加1,而我们只需要统计个数。最后,每个Reduce都输出与其关联的单词和这个单词的数量。所以第一个Reduce输出a=2,第二个Reduce输出b=2,第三个Reduce输出c=1。

这就是一个典型的MapReduce Job。从整体来看,为了保证完整性,有一些术语要介绍一下:

  • Job。整个MapReduce计算称为Job。
  • Task。每一次MapReduce调用称为Task。

所以,对于一个完整的MapReduce Job,它由一些Map Task和一些Reduce Task组成。所以这是一个单词计数器的例子,它解释了MapReduce的基本工作方式。

1.8 Map and Reduce Function

Map函数使用一个key和一个value作为参数。入参中,key是输入文件的名字,通常会被忽略,因为我们不太关心文件名是什么,value是输入文件的内容。所以,对于一个单词计数器来说,value包含了要统计的文本,我们会将这个文本拆分成单词。之后对于每一个单词,我们都会调用emit。emit由MapReduce框架提供,并且这里的emit属于Map函数。emit会接收两个参数,其中一个是key,另一个是value。在单词计数器的例子中,emit入参的key是单词,value是字符串“1”。这就是一个Map函数。

Reduce函数的入参是某个特定key的所有实例(Map输出中的key-value对中,出现了一次特定的key就可以算作一个实例)。所以Reduce函数也是使用一个key和一个value作为参数,其中value是一个数组,里面每一个元素是Map函数输出的key的一个实例的value。对于单词计数器来说,key就是单词,value就是由字符串“1”组成的数组,所以,我们不需要关心value的内容是什么,我们只需要关心value数组的长度。Reduce函数也有一个属于自己的emit函数。这里的emit函数只会接受一个参数value,这个value会作为Reduce函数入参的key的最终输出。所以,对于单词计数器,我们会给emit传入数组的长度。这就是一个最简单的Reduce函数。并且Reduce也不需要知道任何有关容错或者其他有关分布式相关的信息。

学生提问:可以将Reduce函数的输出再传递给Map函数吗?
Robert教授:在现实中,这是很常见的。MapReduce用户定义了一个MapReduce Job,接收一些输入,生成一些输出。之后可能会有第二个MapReduce Job来消费前一个Job的输出。对于一些非常复杂的多阶段分析或者迭代算法,比如说Google用来评价网页的重要性和影响力的PageRank算法,这些算法是逐渐向答案收敛的。我认为Google最初就是这么使用MapReduce的,他们运行MapReduce Job多次,每一次的输出都是一个网页的列表,其中包含了网页的价值,权重或者重要性。所以将MapReduce的输出作为另一个MapReduce Job的输入这很正常。

学生提问:当你调用emit时,数据会发生什么变化?emit函数在哪运行?
Robert教授:首先看,这些函数在哪运行。这里可以看MapReduce论文的图1。现实中,MapReduce运行在大量的服务器之上,我们称之为worker服务器或者worker。同时,也会有一个Master节点来组织整个计算过程。这里实际发生的是,Master服务器知道有多少输入文件,例如5000个输入文件,之后它将Map函数分发到不同的worker。所以,它会向worker服务器发送一条消息说,请对这个输入文件执行Map函数吧。之后,MapReduce框架中的worker进程会读取文件的内容,调用Map函数并将文件名和文件内容作为参数传给Map函数。worker进程还需要实现emit,这样,每次Map函数调用emit,worker进程就会将数据写入到本地磁盘的文件中。所以,Map函数中调用emit的效果是在worker的本地磁盘上创建文件,这些文件包含了当前worker的Map函数生成的所有的key和value。
所以,Map阶段结束时,我们看到的就是Map函数在worker上生成的一些文件。之后,MapReduce的worker会将这些数据移动到Reduce所需要的位置。对于一个典型的大型运算,Reduce的入参包含了所有Map函数对于特定key的输出。通常来说,每个Map函数都可能生成大量key。所以通常来说,在运行Reduce函数之前。运行在MapReduce的worker服务器上的进程需要与集群中每一个其他服务器交互来询问说,看,我需要对key=a运行Reduce,请看一下你本地磁盘中存储的Map函数的中间输出,找出所有key=a,并通过网络将它们发给我。所以,Reduce worker需要从每一个worker获取特定key的实例。这是通过由Master通知到Reduce worker的一条指令来触发。一旦worker收集完所有的数据,它会调用Reduce函数,Reduce函数运算完了会调用自己的emit,这个emit与Map函数中的emit不一样,它会将输出写入到一个Google使用的共享文件服务中。
有关输入和输出文件的存放位置,这是我之前没有提到的,它们都存放在文件中,但是因为我们想要灵活的在任意的worker上读取任意的数据,这意味着我们需要某种网络文件系统(network file system)来存放输入数据。所以实际上,MapReduce论文谈到了GFS(Google File System)。GFS是一个共享文件服务,并且它也运行在MapReduce的worker集群的物理服务器上。GFS会自动拆分你存储的任何大文件,并且以64MB的块存储在多个服务器之上。所以,如果你有了10TB的网页数据,你只需要将它们写入到GFS,甚至你写入的时候是作为一个大文件写入的,GFS会自动将这个大文件拆分成64MB的块,并将这些块平均的分布在所有的GFS服务器之上,而这是极好的,这正是我们所需要的。如果我们接下来想要对刚刚那10TB的网页数据运行MapReduce Job,数据已经均匀的分割存储在所有的服务器上了。如果我们有1000台服务器,我们会启动1000个Map worker,每个Map worker会读取1/1000输入数据。这些Map worker可以并行的从1000个GFS文件服务器读取数据,并获取巨大的读取吞吐量,也就是1000台服务器能提供的吞吐量。

学生提问:Input->Map。这里的箭头代表什么意思?

Robert教授:随着Google这些年对MapReduce系统的改进,答案也略有不同。通常情况下,如果我们在一个例如GFS的文件系统中存储大的文件,你的数据分散在大量服务器之上,你需要通过网络与这些服务器通信以获取你的数据。在这种情况下,这个箭头表示MapReduce的worker需要通过网络与存储了输入文件的GFS服务器通信,并通过网络将数据读取到MapReduce的worker节点,进而将数据传递给Map函数。这是最常见的情况。并且这是MapReduce论文中介绍的工作方式。但是如果你这么做了,这里就有很多网络通信。 如果数据总共是10TB,那么相应的就需要在数据中心网络上移动10TB的数据。而数据中心网络通常是GB级别的带宽,所以移动10TB的数据需要大量的时间。在论文发表的2004年,MapReduce系统最大的限制瓶颈是网络吞吐。如果你读到了论文的评估部分,你会发现,当时运行在一个有数千台机器的网络上,每台计算机都接入到一个机架,机架上有以太网交换机,机架之间通过root交换机连接(最上面那个交换机)。

如果随机的选择MapReduce的worker服务器和GFS服务器,那么至少有一半的机会,它们之间的通信需要经过root交换机,而这个root交换机的吞吐量总是固定的。如果做一个除法,root交换机的总吞吐除以2000,那么每台机器只能分到50Mb/S的网络容量。这个网络容量相比磁盘或者CPU的速度来说,要小得多。所以,50Mb/S是一个巨大的限制。
在MapReduce论文中,讨论了大量的避免使用网络的技巧。其中一个是将GFS和MapReduce混合运行在一组服务器上。所以如果有1000台服务器,那么GFS和MapReduce都运行在那1000台服务器之上。当MapReduce的Master节点拆分Map任务并分包到不同的worker服务器上时,Master节点会找出输入文件具体存在哪台GFS服务器上,并把对应于那个输入文件的Map Task调度到同一台服务器上。所以,默认情况下,这里的箭头是指读取本地文件,而不会涉及网络。虽然由于故障,负载或者其他原因,不能总是让Map函数都读取本地文件,但是几乎所有的Map函数都会运行在存储了数据的相同机器上,并因此节省了大量的时间,否则通过网络来读取输入数据将会耗费大量的时间。

我之前提过,Map函数会将输出存储到机器的本地磁盘,所以存储Map函数的输出不需要网络通信,至少不需要实时的网络通信。但是,我们可以确定的是,为了收集所有特定key的输出,并将它们传递给某个机器的Reduce函数,还是需要网络通信。假设现在我们想要读取所有的相关数据,并通过网络将这些数据传递给单台机器,数据最开始在运行Map Task的机器上按照行存储(例如第一行代表第一个Map函数输出a=1,b=1),而我们最终需要这些数据在运行Reduce函数的机器上按照列存储(例如,Reduce函数需要的是第一个Map函数的a=1和第三个Map函数的a=1)。

论文里称这种数据转换之为洗牌(shuffle)。所以,这里确实需要将每一份数据都通过网络从创建它的Map节点传输到需要它的Reduce节点。所以,这也是MapReduce中代价较大的一部分。

学生提问:是否可以通过Streaming的方式加速Reduce的读取?
Robert教授:你是对的。你可以设想一个不同的定义,其中Reduce通过streaming方式读取数据。我没有仔细想过这个方法,我也不知道这是否可行。作为一个程序接口,MapReduce的第一目标就是让人们能够简单的编程,人们不需要知道MapReduce里面发生了什么。对于一个streaming方式的Reduce函数,或许就没有之前的定义那么简单了。
不过或许可以这么做。实际上,很多现代的系统中,会按照streaming的方式处理数据,而不是像MapReduce那样通过批量的方式处理Reduce函数。在MapReduce中,需要一直要等到所有的数据都获取到了才会进行Reduce处理,所以这是一种批量处理。现代系统通常会使用streaming并且效率会高一些。


LEC 2 RPC and Threads

2.1 Why Threads?

本课程使用的是GO语言,GO语言具有以下优点:对线程的良好支持,方便的RPC库,类型安全(内存安全,不会操作没有授权的内存地址,消除很大一类bug),垃圾回收,多线程和垃圾回收的结合,相对简单。最后推荐去看一下《Effective Go》这本书。

多个线程运行一个程序同时进行多个任务,每个线程内部程序串行运行,并且有自己的程序计数器、寄存器和栈空间,这些线程之间是共享内存的。多线程的应用在分布式系统中非常常见,因为它能够支持并发操作,契合分布式系统的特点。使用线程的优点:

  • IO并发IO Concurrency。客户端构建多个线程向不同的服务器发起RPC请求,每个线程得到响应之后再执行对应的任务。
  • 多核心性能Multicore Performance。使用多线程可以最大限度的利用多核CPU的性能。多个线程可以由不同的CPU核心进行处理,不同CPU核心的线程拥有独立的CPU周期。
  • 方便Convenience。多线程大大简化编程难度。比如在分布式系统中,我们想要每隔一定的时间进行一次事件检查(如 MapReduce中Master节点检查Worker是否异常),我们就可以创建一个线程,让其专门负责定期检查Worker是否存活。

在进行多线程编程时,通常需要仔细考虑以下几个重要问题:

  • 数据共享Shared Data。线程间是可以共享进程数据的,但是在使用共享数据的过程中,很可能会出现冲突问题。
  • 线程协作Coordination。我们经常需要线程间能够相互协作,比如经典的消费者-生产者模型。在GO语言中,线程间相互写作的方式有:channel,sync.Cond,sync.WaitGroup。
  • 线程同步:死锁。

2.2 Alternative to Threads: Event-Driven

在单个线程中编写显式交错活动(代码会从一处跳转到另一处)的代码,通常被称为“事件驱动”。如果要实现并发I/O,除了采取多线程的方式,还可以采用事件驱动编程的思想来实现,如epoll模型。在事件驱动编程中,有一个线程会负责循环检测所有的事件状态,如客户端发起的RPC请求等,当该线程检测到事件到来时,如服务器响应RPC请求,该线程就会调用相应的处理函数,并继续进行循环监听。事件驱动编程相比多线程的实现方式有以下不同:

  • 优点是开销更小,多线程的创建和删除占用的开销远大于事件驱动。
  • 缺点是无法利用多核CPU的性能,而且实现较为复杂。

感觉这就是并发的另一种实现方式,而且是非阻塞的。

2.3 Threading Example: Web Crawler

用一个简单的网页爬虫来展示GO多线程的应用。网络爬虫的目的是获取所有的网页,这些网页通过链接连成一张图,有些网页有多个链接,而且可能形成环。使用爬虫带来的挑战是:

  • 需要利用IO并发。网络的延迟(Network Latency)是比网络的容量(Network Capacity)更限制性能的原因,为了提高性能,要使用并发来同时获取多个网页。
  • 只能获取某个网页一次。为了避免浪费网络带宽,也是减轻服务器压力的方式,所以需要记住每个URL是否被抓取过。
  • 知道什么时候结束。

首先给出基本的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//
// main
//

func main() {
fmt.Printf("=== Serial===\n")
Serial("http://golang.org/", fetcher, make(map[string]bool))

fmt.Printf("=== ConcurrentMutex ===\n")
ConcurrentMutex("http://golang.org/", fetcher, makeState())

fmt.Printf("=== ConcurrentChannel ===\n")
ConcurrentChannel("http://golang.org/", fetcher)
}

//
// Fetcher
//

type Fetcher interface {
// Fetch returns a slice of URLs found on the page.
Fetch(url string) (urls []string, err error)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
body string
urls []string
}

func (f fakeFetcher) Fetch(url string) ([]string, error) {
if res, ok := f[url]; ok {
fmt.Printf("found: %s\n", url)
return res.urls, nil
}
fmt.Printf("missing: %s\n", url)
return nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}

Serial Crawler,串行爬虫通过递归串行调用执行深度优先探测,一次只能爬取一个网页,效率很低。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//
// Serial crawler
//

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
}

Concurrent Crawler with Shared State and Mutex.

使用WaitGroup的作用:确保所有子线程执行完毕,才退出主线程。Add加1,Done减1,Wait等于0的时候继续执行,不为0阻塞。

相比于串行版本,做出了如下改变:

  • 为每一个网页创建了一个线程,使用了go创建协程,这个协程执行一个匿名函数。
  • 使用了一个map作为共享的。因为所有协程操作的是同一个map,所以使用互斥量保证原子性。
  • 使用WaitGroup来保证所有的线程都执行完毕才退出。
  • defer done.Done()来保证即使协程出现异常,也能正确更新WaitGroup计数器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//
// Concurrent crawler with shared state and Mutex
//

type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}

func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
f.mu.Lock()
already := f.fetched[url]
f.fetched[url] = true
f.mu.Unlock()

if already {
return
}

urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
u2 := u
go func() {
defer done.Done()
ConcurrentMutex(u2, fetcher, f)
}()
//go func(u string) {
// defer done.Done()
// ConcurrentMutex(u, fetcher, f)
//}(u)
}
done.Wait()
return
}

func makeState() *fetchState {
f := &fetchState{}
f.fetched = make(map[string]bool)
return f
}

Concurrent Crawler with Channels.

Channels既是同步的也是通信的,同一个channel可以提供给多个协程用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func master(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
master(ch, fetcher)
}

2.4 Remote Procedure Call

分布式系统的一种重要的机制,目的是为了让客户端和服务器的通信变成变得容易,忽略掉网络通信协议的细节。RPC框架调用过程如下:

  • client调用server上的函数f(x,y)
  • client stub将调用的函数及相关参数进行打包,通过网络发送给server
  • server stub接收到数据包后进行参数和函数解析,调用server中的方法f(x,y)
  • server将函数调用解决通过server stub返回,返回过程与发送过程相同

2.5 Handle Error

RPC出现错误的原因可能有丢包、网络瘫痪、服务器响应慢、服务器崩溃了。在服务器端看到的情况可能是根本没看到那个请求、请求收到了但在响应前崩溃了、请求收到了也发出去了但是在传输过程丢失了。

在进行RPC通信的时候,可能出现的异常情况是client在发送了rpc request之后,没有收到server的响应。对于这种异常错误,一般有以下几种处理机制。

  • at-least-once:client会一直等待server的回复,并不断发送请求,知道发送上限或者得到服务器的回答。
  • at-most-once:监测所有重复的请求,只回应重复的请求一次,其他的重复请求则返回第一次请求的数据。
  • exactly-once:分布式系统中的难题,比较难实现,目前通用的解决方案是重传+冗余检测+异常处理。

2.6 RPC Example: K/V

下面我们用一个简单的K/V数据库来学习如何用Go来实现RPC通信。该例子中的数据库包含两个功能,put和get,put操作支持client向server中插入一个任意的键值对数据,get操作支持client查询server中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main

import (
"fmt"
"log"
"net"
"net/rpc"
"sync"
)

//
// Common RPC request/reply definitions
//

const (
OK = "OK"
ErrNoKey = "ErrNoKey"
)

type Err string

type PutArgs struct {
Key string
Value string
}

type PutReply struct {
Err Err
}

type GetArgs struct {
Key string
}

type GetReply struct {
Err Err
Value string
}

//
// Client
//

func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}

func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}

func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}

//
// Server
//

type KV struct {
mu sync.Mutex
data map[string]string
}

func server() {
kv := new(KV)
kv.data = map[string]string{}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

val, ok := kv.data[args.Key]
if ok {
reply.Err = OK
reply.Value = val
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()

kv.data[args.Key] = args.Value
reply.Err = OK
return nil
}

//
// main
//

func main() {
server()

put("subject", "6.824")
fmt.Printf("Put(subject, 6.824) done\n")
fmt.Printf("get(subject) -> %s\n", get("subject"))
}

client流程:

  1. connect()函数用于与server建立tcp连接。
  2. get()和put()函数相当于client stubs,用于打包请求数据
  3. Call()函数通知RPC准备发起请求。在本例中我们在Call之前,已经定义好了请求和应答的格式,RPC库会打包参数,发送请求,然后等待回复,收到回复后再根据回复格式解析参数。

server流程:

  1. 创建一个基于tcp连接的server,并将K/V数据库注册到RPC库中
  2. RPC框架在收到请求后,会为新的请求启动一个goroutine。新线程会解析请求参数,并在已注册的服务中找到匹配的服务,调用对应的函数,打包执行结果写入TCP连接。
  3. Get()和Put()函数必须加锁,因为RPC会为每个请求都单独创建一个goroutine。

LEC 3 GFS

GFS:Google File System.

3.1 Why Distributed Storage System Hard?

构建分布式系统大多都是如何设计存储系统,或是设计其他基于大型分布式存储的系统。

构建一个分布式存储系统为什么难,这里会有一个循环:

  • 设计大型分布式系统的初衷是为了获取性能,很自然的想法就是将数据分割到不同的服务器上,这样可以并行从多台服务器读取数据。这种方式叫做分片Sharding。
  • 如果在成千上万台服务器上分片,会看见常态的故障,所以需要一个容错的系统。
  • 实现容错的方法是复制,只需要维护2-3个数据的副本,当其中一个故障了,就可以使用另一个。
  • 如果要复制,就有多个副本,就有了不一致的问题。
  • 为了避免不一致的问题,就需要在不同的服务器之间交互,这样的代价就是低性能,这明显和一开始的希望违背了。

Performance -> Sharding -> Fault Tolerance -> Replication -> Consistence -> Low Performance.

3.2 A Bad Design

这里先说一致性的问题。对于具备强一致性或者好的一致性的系统,从客户端或者应用程序来看就像是和同一台服务器通信。虽然通过数百台计算机构建了一个系统,但是对一个理想的强一致性系统,客户端看到的就是一台服务器,一份数据,在做一件事情。

这里有一个几乎是最糟糕的多副本设计,我提出它是为了让你们知道问题所在,并且同样的问题在GFS中也存在。这个设计是这样,我们有两台服务器,每个服务器都有数据的一份完整拷贝。它们在磁盘上都存储了一个key-value表单。当然,直观上我们希望这两个表单是完全一致的,这样,一台服务器故障了,我们可以切换到另一台服务器去做读写。

两个表单完全一致意味着,每一个写请求都必须在两台服务器上执行,而读请求只需要在一台服务器上执行,否则就没有容错性了。因为如果读请求也需要从两台服务器读数据,那么一台服务器故障我们就没法提供服务了。

写请求需要在两台服务器上都执行,假如两台客户端对同一个Key各发送了一个写请求。一个问题是这两个请求到达两台服务器的时间不一定一样。这个时候我们再去读的时候,对不同的服务器读,读到的数据会出现不一致的问题。

这里的问题还可以通过另一种方式暴露出来。假设我们尝试修复上面的问题,每次写的时候,对两个服务器都进行写操作,但是读的时候,只在同一个服务器上读。但是假如那个服务器突然出现了故障,这个时候不一致的问题又会暴露出来。

3.3 GFS Design Goals

让我们来讨论GFS吧。GFS做了很多工作来解决前面提到的问题,虽然不够完美,但是GFS已经做的很好了。Google的目标是构建一个大型的、快速的文件系统,并且这个系统是全局有效的,这样各种不同的应用程序都可以从中获取数据。之前的存储系统的为不同的应用程序构建不同的存储系统。但是有了全局通用的存储系统,就意味着自己的应用程序可以访问这些数据,别人的应用程序也可以申请权限来获取这些数据。

为了获取大容量和高速的特性,每个文件都会被分割并且存放到多个服务器上,这样的读写速度会很快,因为可以从不同的服务器上读取同一个文件。而且将文件分割之后,存储系统可以保存比单个磁盘还要大的文件。

因为我们现在在数百台服务器之上构建存储系统,我们希望有自动的故障修复。我们不希望每次服务器出了故障,派人到机房去修复服务器或者迁移数据。我们希望系统能自动修复自己。

还有一些特征并非是设计目标。比如GFS被设计成只在一个数据中心运行,所以这里并没有将副本保存在世界各地,单个GFS只存在于单个数据中心的单个机房里。理论上来说,数据的多个副本应该彼此之间隔的远一些,但是实现起来挺难的,所以GFS局限在一个数据中心内。

学生提问:如果GFS返回错误的数据,会不会影响应用程序?

Robert教授:讽刺的是。有谁关心网上的投票数量是否正确呢,如果你通过搜索引擎做搜索,20000个搜索结果中丢失了一条或者搜索结果排序是错误的,没有人会注意到这些。这类系统对于错误的接受能力好过类似于银行这样的系统。当然并不意味着所有的网站数据都可以是错误的。如果你通过广告向别人收费,你最好还是保证相应的数字是对的。
另外,尽管GFS可能会返回错误的数据,但是可以在应用程序中做一些补偿。例如论文中提到,应用程序应当对数据做校验,并明确标记数据的边界,这样应用程序在GFS返回不正确数据时可以恢复。

3.4 GFS Master Node

上百个客户端和一个Master节点。尽管实际中可以拿多台机器作为Master节点,但是GFS中Master是Active-Standby模式,所以只有一个Master节点在工作。Master节点保存了文件名和存储位置的对应关系。除此之外,还有大量的Chunk服务器,可能会有数百个,每一个Chunk服务器上都有1-2块磁盘。

Master节点用来管理文件和Chunk的信息,而Chunk服务器用来存储实际的数据。这是GFS设计中比较好的一面,它将这两类数据的管理问题几乎完全隔离开了,这样这两个问题可以使用独立设计来解决。

Master节点知道每一个文件对应的所有的Chunk的ID,这些Chunk每个是64MB大小,它们共同构成了一个文件。当我想读取一个文件中的任意一个部分时,我需要向Master节点查询对应的Chunk在哪个服务器上,之后我可以直接从Chunk服务器读取对应的Chunk数据。

Master节点内保存的数据内容有两个主要的表单:

  • 第一个是文件名到Chunk ID或者Chunk Handle数组的对应。这个表单告诉你,文件对应了哪些Chunk。但是只有Chunk ID是做不了太多事情的,所以有了第二个表单。
  • 第二个表单记录了Chunk ID到Chunk数据的对应关系。这里的数据又包括了:
    • 每个Chunk存储在哪些服务器上,所以这部分是Chunk服务器的列表。
    • 每个Chunk当前的版本号,所以Master节点必须记住每个Chunk对应的版本号。
    • 所有对于Chunk的写操作都必须在主Chunk(Primary Chunk)上顺序处理,主Chunk是Chunk的多个副本之一。所以,Master节点必须记住哪个Chunk服务器持有主Chunk。
    • 并且,主Chunk只能在特定的租约时间内担任主Chunk,所以,Master节点要记住主Chunk的租约过期时间。

以上数据都存储在内存中,如果Master故障了,这些数据就都丢失了。为了能让Master重启而不丢失数据,Master节点会同时将数据存储在磁盘上。所以Master节点读数据只会从内存读,但是写数据的时候,至少有一部分数据会接入到磁盘中。更具体来说,Master会在磁盘上存储log,每次有数据变更时,Master会在磁盘的log中追加一条记录,并生成CheckPoint(类似于备份点)。有些数据需要存在磁盘上,而有些不用。它们分别是:

  • Chunk Handle的数组(第一个表单)要保存在磁盘上。我给它标记成NV(non-volatile, 非易失),这个标记表示对应的数据会写入到磁盘上。
  • Chunk服务器列表不用保存到磁盘上。因为Master节点重启之后可以与所有的Chunk服务器通信,并查询每个Chunk服务器存储了哪些Chunk,所以我认为它不用写入磁盘。
  • 版本号要不要写入磁盘取决于GFS是如何工作的,我认为它需要写入磁盘。
  • 主Chunk的ID,几乎可以确定不用写入磁盘,因为Master节点重启之后会忘记谁是主Chunk,它只需要等待60秒租约到期,那么它知道对于这个Chunk来说没有主Chunk,这个时候,Master节点可以安全指定一个新的主Chunk。
  • 类似的,租约过期时间也不用写入磁盘。

任何时候,如果文件扩展到达了一个新的64MB,需要新增一个Chunk或者由于指定了新的主Chunk而导致版本号更新了,Master节点需要向磁盘中的Log追加一条记录说,我刚刚向这个文件添加了一个新的Chunk或者我刚刚修改了Chunk的版本号。所以每次有这样的更新,都需要写磁盘。GFS论文并没有讨论这么多细节,但是因为写磁盘的速度是有限的,写磁盘会导致Master节点的更新速度也是有限的,所以要尽可能少的写入数据到磁盘。

这里在磁盘中维护log而不是数据库的原因是,数据库本质上来说是某种B树(b-tree)或者hash table,相比之下,追加log会非常的高效,因为你可以将最近的多个log记录一次性的写入磁盘。因为这些数据都是向同一个地址追加,这样只需要等待磁盘的磁碟旋转一次。而对于B树来说,每一份数据都需要在磁盘中随机找个位置写入。所以使用Log可以使得磁盘写入更快一些。

当Master节点故障重启,并重建它的状态,你不会想要从log的最开始重建状态,因为log的最开始可能是几年之前,所以Master节点会在磁盘中创建一些checkpoint点,这可能要花费几秒甚至一分钟。这样Master节点重启时,会从log中的最近一个checkpoint开始恢复,再逐条执行从Checkpoint开始的log,最后恢复自己的状态。

3.5 GFS Read File

对于读请求来说,意味着应用程序或者GFS客户端有一个文件名和它想从文件的某个位置读取的偏移量(offset),应用程序会将这些信息发送给Master节点。Master节点会从自己的file表单中查询文件名,得到Chunk ID的数组。因为每个Chunk是64MB,所以偏移量除以64MB就可以从数组中得到对应的Chunk ID。之后Master再从Chunk表单中找到存有Chunk的服务器列表,并将列表返回给客户端。所以,第一步是客户端(或者应用程序)将文件名和偏移量发送给Master。第二步,Master节点将Chunk Handle(也就是ID,记为H)和服务器列表发送给客户端。

现在客户端可以从这些Chunk服务器中挑选一个来读取数据。因为客户端每次可能只读取1MB或者64KB数据,所以,客户端可能会连续多次读取同一个Chunk的不同位置。所以,客户端会缓存Chunk和服务器的对应关系,这样,当再次读取相同Chunk数据时,就不用一次次的去向Master请求相同的信息。

接下来,客户端会与选出的Chunk服务器通信,将Chunk Handle和偏移量发送给那个Chunk服务器。Chunk服务器会在本地的硬盘上,将每个Chunk存储成独立的Linux文件,并通过普通的Linux文件系统管理。并且可以推测,Chunk文件会按照Handle(也就是ID)命名。所以,Chunk服务器需要做的就是根据文件名找到对应的Chunk文件,之后从文件中读取对应的数据段,并将数据返回给客户端。

学生提问:如果读取的数据超过了一个Chunk怎么办?

Robert教授:我不知道详细的细节。我的印象是,如果应用程序想要读取超过64MB的数据,或者就是2个字节,但是却跨越了Chunk的边界,应用程序会通过一个库来向GFS发送RPC,而这个库会注意到这次读请求会跨越Chunk边界,因此会将一个读请求拆分成两个读请求再发送到Master节点。所以,这里可能是向Master节点发送两次读请求,得到了两个结果,之后再向两个不同的Chunk服务器读取数据。

学生提问:能再介绍一下读数据跨越了Chunk边界的情况吗?

Robert教授:客户端本身依赖了一个GFS的库,这个库会注意到读请求跨越了Chunk的边界 ,并会将读请求拆分,之后再将它们合并起来。所以这个库会与Master节点交互,Master节点会告诉这个库说Chunk7在这个服务器,Chunk8在那个服务器。之后这个库会说,我需要Chunk7的最后两个字节,Chunk8的头两个字节。GFS库获取到这些数据之后,会将它们放在一个buffer中,再返回给调用库的应用程序。Master节点会告诉库有关Chunk的信息,而GFS库可以根据这个信息找到应用程序想要的数据。应用程序只需要确定文件名和数据在整个文件中的偏移量,GFS库和Master节点共同协商将这些信息转换成Chunk。

3.6 GFS Write File

为了简单起见,先考虑追加的问题。对于写文件,客户端会向Master节点发送请求说:我想向这个文件名对应的文件追加数据,请告诉我文件中最后一个Chunk的位置。

当有多个客户端同时写同一个文件时,一个客户端并不能知道文件究竟有多长。因为如果只有一个客户端在写文件,客户端自己可以记录文件长度,而多个客户端时,一个客户端没法知道其他客户端写了多少。

对于读文件来说,可以从任何最新的Chunk副本读取数据,但是对于写文件来说,必须要通过Chunk的主副本(Primary Chunk)来写入。对于某个特定的Chunk来说,在某一个时间点,Master不一定指定了Chunk的主副本。所以,写文件的时候,需要考虑Chunk的主副本不存在的情况。

对于Master节点来说,如果发现Chunk的主副本不存在,Master会找出所有存有Chunk最新副本的Chunk服务器。如果你的一个系统已经运行了很长时间,那么有可能某一个Chunk服务器保存的Chunk副本是旧的,比如说还是昨天或者上周的。导致这个现象的原因可能是服务器因为宕机而没有收到任何的更新。所以,Master节点需要能够在Chunk的多个副本中识别出,哪些副本是新的,哪些是旧的。所以第一步是,找出新的Chunk副本。这一切都是在Master节点发生,因为,现在是客户端告诉Master节点说要追加某个文件,Master节点需要告诉客户端向哪个Chunk服务器(也就是Primary Chunk所在的服务器)去做追加操作。所以,Master节点的部分工作就是弄清楚在追加文件时,客户端应该与哪个Chunk服务器通信。

每个Chunk可能同时有多个副本,最新的副本是指,副本中保存的版本号与Master中记录的Chunk的版本号一致。Chunk副本中的版本号是由Master节点下发的,所以Master节点知道,对于一个特定的Chunk,哪个版本号是最新的。这就是为什么Chunk的版本号在Master节点上需要保存在磁盘这种非易失的存储中的原因,因为如果版本号在故障重启中丢失,且部分Chunk服务器持有旧的Chunk副本,这时,Master是没有办法区分哪个Chunk服务器的数据是旧的,哪个Chunk服务器的数据是最新的。

回到之前的话题,当客户端想要对文件进行追加,但是又不知道文件尾的Chunk对应的Primary在哪时,Master会等所有存储了最新Chunk版本的服务器集合完成,然后挑选一个作为Primary,其他的作为Secondary。之后,Master会增加版本号,并将版本号写入磁盘,这样就算故障了也不会丢失这个数据。

接下来,Master节点会向Primary和Secondary副本对应的服务器发送消息并告诉它们,谁是Primary,谁是Secondary,Chunk的新版本是什么。Primary和Secondary服务器都会将版本号存储在本地的磁盘中。这样,当它们因为电源故障或者其他原因重启时,它们可以向Master报告本地保存的Chunk的实际版本号。

现在,Master节点通知Primary和Secondary服务器,你们可以修改这个Chunk。它还给Primary一个租约,这个租约告诉Primary说,在接下来的60秒中,你将是Primary,60秒之后你必须停止成为Primary。

假设现在Master节点告诉客户端谁是Primary,谁是Secondary,GFS提出了一种聪明的方法来实现写请求的执行序列。客户端会将要追加的数据发送给Primary和Secondary服务器,这些服务器会将数据写入到一个临时位置。所以最开始,这些数据不会追加到文件中。当所有的服务器都返回确认消息说,已经有了要追加的数据,客户端会向Primary服务器发送一条消息说,你和所有的Secondary服务器都有了要追加的数据,现在我想将这个数据追加到这个文件中。Primary服务器或许会从大量客户端收到大量的并发请求,Primary服务器会以某种顺序,一次只执行一个请求。对于每个客户端的追加数据请求(也就是写请求),Primary会查看当前文件结尾的Chunk,并确保Chunk中有足够的剩余空间,然后将客户端要追加的数据写入Chunk的末尾。并且,Primary会通知所有的Secondary服务器也将客户端要追加的数据写入在它们自己存储的Chunk末尾。这样,包括Primary在内的所有副本,都会收到通知将数据追加在Chunk的末尾。

但是对于Secondary服务器来说,它们可能可以执行成功,也可能会执行失败,比如说磁盘空间不足,比如说故障了,比如说Primary发出的消息网络丢包了。如果Secondary实际真的将数据写入到了本地磁盘存储的Chunk中,它会回复“yes”给Primary。如果所有的Secondary服务器都成功将数据写入,并将“yes”回复给了Primary,并且Primary也收到了这些回复。Primary会向客户端返回写入成功。如果至少一个Secondary服务器没有回复Primary,或者回复了,但是内容却是:抱歉,一些不好的事情发生了,比如说磁盘空间不够,或者磁盘故障了,Primary会向客户端返回写入失败。

GFS论文说,如果客户端从Primary得到写入失败,那么客户端应该重新发起整个追加过程。客户端首先会重新与Master交互,找到文件末尾的Chunk;之后,客户端需要重新发起对于Primary和Secondary的数据追加操作。

学生提问:写文件失败之后Primary和Secondary服务器上的状态如何恢复?

Robert教授:你的问题是,Primary告诉所有的副本去执行数据追加操作,某些成功了,某些没成功。如果某些副本没有成功执行,Primary会回复客户端说执行失败。之后客户端会认为数据没有追加成功。但是实际上,部分副本还是成功将数据追加了。所以现在,一个Chunk的部分副本成功完成了数据追加,而另一部分没有成功,这种状态是可接受的,没有什么需要恢复,这就是GFS的工作方式。

学生提问:写文件失败之后,读Chunk数据会有什么不同?

Robert教授:如果写文件失败之后,一个客户端读取相同的Chunk,客户端可能可以读到追加的数据,也可能读不到,取决于客户端读的是Chunk的哪个副本。
如果一个客户端发送写文件的请求,并且得到了表示成功的回复,那意味着所有的副本都在相同的位置追加了数据。如果客户端收到了表示失败的回复,那么意味着0到多个副本实际追加了数据,其他的副本没有追加上数据。所以这时,有些副本会有追加的数据,有些副本没有。这时,取决于你从哪个副本读数据,有可能读到追加的新数据,也有可能读不到。

学生提问:可不可以通过版本号来判断副本是否有之前追加的数据?

Robert教授:所有的Secondary都有相同的版本号。版本号只会在Master指定一个新Primary时才会改变。通常只有在原Primary发生故障了,才会指定一个新的Primary。所以,副本(参与写操作的Primary和Secondary)都有相同的版本号,你没法通过版本号来判断它们是否一样,或许它们就是不一样的(取决于数据追加成功与否)。
这么做的理由是,当Primary回复“no”给客户端时,客户端知道写入失败了,之后客户端的GFS库会重新发起追加数据的请求,直到最后成功追加数据。成功了之后,追加的数据会在所有的副本中相同位置存在。在那之前,追加的数据只会在部分副本中存在。

学生提问:什么时候版本号会增加?

Robert教授:版本号只在Master节点认为Chunk没有Primary时才会增加。在一个正常的流程中,如果对于一个Chunk来说,已经存在了Primary,那么Master节点会记住已经有一个Primary和一些Secondary,Master不会重新选择Primary,也不会增加版本号。它只会告诉客户端说这是Primary,并不会变更版本号。

学生提问:如果写入数据失败了,不是应该先找到问题在哪再重试吗?

Robert教授:我认为这是个有趣的问题。当Primary向客户端返回写入失败时,你或许会认为一定是哪里出错了,在修复之前不应该重试。实际上,就我所知,论文里面在重试追加数据之前没有任何中间操作。因为,错误可能就是网络数据的丢失,这时就没什么好修复的,网络数据丢失了,我们应该重传这条网络数据。客户端重新尝试追加数据可以看做是一种复杂的重传数据的方法。或许对于大多数的错误来说,我们不需要修改任何东西,同样的Primary,同样的Secondary,客户端重试一次或许就能正常工作,因为这次网络没有丢包。
但是如果是某一个Secondary服务器出现严重的故障,那问题变得有意思了。我们希望的是,Master节点能够重新生成Chunk对应的服务器列表,将不工作的Secondary服务器剔除,再选择一个新的Primary,并增加版本号。如果这样的话,我们就有了一组新的Primary,Secondary和版本号,同时,我们还有一个不太健康的Secondary,它包含的是旧的副本和旧的版本号,正是因为版本号是旧的,Master永远也不会认为它拥有新的数据。但是,论文中没有证据证明这些会立即发生。论文里只是说,客户端重试,并且期望之后能正常工作。最终,Master节点会ping所有的Chunk服务器,如果Secondary服务器挂了,Master节点可以发现并更新Primary和Secondary的集合,之后再增加版本号。但是这些都是之后才会发生(而不是立即发生)。

学生提问:如果Master节点发现Primary挂了会怎么办?

Robert教授:可以这么回答这个问题。在某个时间点,Master指定了一个Primary,之后Master会一直通过定期的ping来检查它是否还存活。因为如果它挂了,Master需要选择一个新的Primary。Master发送了一些ping给Primary,并且Primary没有回应,你可能会认为Master会在那个时间立刻指定一个新的Primary。但事实是,这是一个错误的想法。为什么是一个错误的想法呢?因为可能是网络的原因导致ping没有成功,所以有可能Primary还活着,但是网络的原因导致ping失败了。但同时,Primary还可以与客户端交互,如果Master为Chunk指定了一个新的Primary,那么就会同时有两个Primary处理写请求,这两个Primary不知道彼此的存在,会分别处理不同的写请求,最终会导致有两个不同的数据拷贝。这被称为脑裂(split-brain)。
脑裂是一种非常重要的概念,我们会在之后的课程中再次介绍它(详见6.1),它通常是由网络分区引起的。比如说,Master无法与Primary通信,但是Primary又可以与客户端通信,这就是一种网络分区问题。网络故障是这类分布式存储系统中最难处理的问题之一。
所以,我们想要避免错误的为同一个Chunk指定两个Primary的可能性。Master采取的方式是,当指定一个Primary时,为它分配一个租约,Primary只在租约内有效。Master和Primary都会知道并记住租约有多长,当租约过期了,Primary会停止响应客户端请求,它会忽略或者拒绝客户端请求。因此,如果Master不能与Primary通信,并且想要指定一个新的Primary时,Master会等到前一个Primary的租约到期。这意味着,Master什么也不会做,只是等待租约到期。租约到期之后,可以确保旧的Primary停止了它的角色,这时Master可以安全的指定一个新的Primary而不用担心出现这种可怕的脑裂的情况。

3.7 Consistency of GFS

每次写入有可能成功,但是也有可能失败。当某个chunk副本写入失败的时候,这个空间是不被占用的,下一次开始写会到下一个地址空间。在GFS的这种工作方式下,如果Primary返回写入成功,那么一切都还好,如果Primary返回写入失败,就不是那么好了。Primary返回写入失败会导致不同的副本有完全不同的数据。

GFS这样设计的理由是足够的简单,但是同时也给应用程序暴露了一些奇怪的数据。这里希望为应用程序提供一个相对简单的写入接口,但应用程序需要容忍读取数据的乱序。如果应用程序不能容忍乱序,应用程序要么可以通过在文件中写入序列号,这样读取的时候能自己识别顺序,要么如果应用程序对顺序真的非常敏感那么对于特定的文件不要并发写入。

如果你想要将GFS升级成强一致系统,我可以为你列举一些你需要考虑的事情:

  • 你可能需要让Primary来探测重复的请求,这样第二个写入数据B的请求到达时,Primary就知道,我们之前看到过这个请求,可能执行了也可能没执行成功。Primay要尝试确保B不会在文件中出现两次。所以首先需要的是探测重复的能力。
  • 对于Secondary来说,如果Primay要求Secondary执行一个操作,Secondary必须要执行而不是只返回一个错误给Primary。对于一个严格一致的系统来说,是不允许Secondary忽略Primary的请求而没有任何补偿措施的。所以我认为,Secondary需要接受请求并执行它们。如果Secondary有一些永久性故障,例如磁盘被错误的拔出了,你需要有一种机制将Secondary从系统中移除,这样Primary可以与剩下的Secondary继续工作。但是GFS没有做到这一点,或者说至少没有做对。
  • 当Primary要求Secondary追加数据时,直到Primary确信所有的Secondary都能执行数据追加之前,Secondary必须小心不要将数据暴露给读请求。所以对于写请求,你或许需要多个阶段。在第一个阶段,Primary向Secondary发请求,要求其执行某个操作,并等待Secondary回复说能否完成该操作,这时Secondary并不实际执行操作。在第二个阶段,如果所有Secondary都回复说可以执行该操作,这时Primary才会说,好的,所有Secondary执行刚刚你们回复可以执行的那个操作。这是现实世界中很多强一致系统的工作方式,这被称为两阶段提交(Two-phase commit)。
  • 另一个问题是,当Primary崩溃时,可能有一组操作由Primary发送给Secondary,Primary在确认所有的Secondary收到了请求之前就崩溃了。当一个Primary崩溃了,一个Secondary会接任成为新的Primary,但是这时,新Primary和剩下的Secondary会在最后几个操作有分歧,因为部分副本并没有收到前一个Primary崩溃前发出的请求。所以,新的Primary上任时,需要显式的与Secondary进行同步,以确保操作历史的结尾是相同的。
  • 最后,时不时的,Secondary之间可能会有差异,或者客户端从Master节点获取的是稍微过时的Secondary。系统要么需要将所有的读请求都发送给Primary,因为只有Primary知道哪些操作实际发生了,要么对于Secondary需要一个租约系统,就像Primary一样,这样就知道Secondary在哪些时间可以合法的响应客户端。

为了实现强一致,以上就是我认为的需要在系统中修复的东西,它们增加了系统的复杂度,增加了系统内部组件的交互。我也是通过思考课程的实验,得到上面的列表的,你们会在lab2,3中建立一个强一致系统,并完成所有我刚刚说所有的东西。

最后,让我花一分钟来介绍GFS在它生涯的前5-10年在Google的出色表现,总的来说,它取得了巨大的成功,许多许多Google的应用都使用了它,许多Google的基础架构,例如BigTable和MapReduce是构建在GFS之上,所以GFS在Google内部广泛被应用。它最严重的局限可能在于,它只有一个Master节点,会带来以下问题:

  • Master节点必须为每个文件,每个Chunk维护表单,随着GFS的应用越来越多,这意味着涉及的文件也越来越多,最终Master会耗尽内存来存储文件表单。你可以增加内存,但是单台计算机的内存也是有上限的。所以,这是人们遇到的最早的问题。
  • 除此之外,单个Master节点要承载数千个客户端的请求,而Master节点的CPU每秒只能处理数百个请求,尤其Master还需要将部分数据写入磁盘,很快,客户端数量超过了单个Master的能力。
  • 另一个问题是,应用程序发现很难处理GFS奇怪的语义(本节最开始介绍的GFS的副本数据的同步,或者可以说不同步)。
  • 最后一个问题是,从我们读到的GFS论文中,Master节点的故障切换不是自动的。GFS需要人工干预来处理已经永久故障的Master节点,并更换新的服务器,这可能需要几十分钟甚至更长的而时间来处理。对于某些应用程序来说,这个时间太长了。