前言 这是一篇学习笔记。学习的材料来自Jay Kreps的一篇讲Log的博文。原文很长,但是我坚持看完了,收获颇多,也深深为Jay哥的技术能力、架构能力和对于分布式系统的理解之深刻所折服。同时也因为某些理解和Jay哥观点吻合而略沾沾自喜。 Jay Kreps是前Linkedin的Principal Staff Engineer,现任Confluent公司的联合创始人和CEO,Kafka和Samza的主要作者。 所谓笔记,就是看了文章,提笔就记,因为Jay哥本身本章组织的太好,而其本身的科学素养及哲学素养也很高,所以私以为出彩的东西就不省略了。 一、资料来源
二、笔记 2.1 Log的价值
2) Log可能跟计算机的历史一样长,并且是分布式数据系统和实时计算系统的核心。
4) 不理解Log,你就不可能充分理解
2.2 什么是Log? 2.2.1 概述
记录的顺序定义了这样的一个概念:时间。
这样说,可能你会觉得log如此简单,还有讨论的必要吗? Log记录了何时发生了什么(they record what happened and when.)。 而这一条,通常是分布式系统最最最核心的东西。
2.2.2 数据库中的Logs Log的起源不得而知,就像发明二分查找的人,难以意识到这种发明是一种发明。
Log从保证ACID特性的一种实现,发展成了一种数据库之间数据复制的手段。 很显然,数据库中发生的一系列的数据变更,成为数据库之间 保持同步最需要的信息。
2.2.3 分布式系统中的logs Log解决了两个很重要的分布式数据系统中的问题:
所谓的状态机复制原理(State Machine Replication Principle): 如果两个确定的处理过程,从相同的状态开始,按照相同的顺序,接收相同的输入,那么它们将会产生相同的输出,并以 相同的状态结束。 所谓确定的(deterministic),是指处理过程是时间无关的,其处理结果亦不受额外输入的影响。
所谓状态,可以是机器上的任意数据,无论在处理结束后,是在机器的内存中还是磁盘上。
Log系统的作用,就是将所有的输入流之上的不确定性驱散,确保所有的处理相同输入的复制节点保持同步。 这种方法的最妙之处在于,你可以将索引日志的时间戳,作为所有复制节点的时钟来对待: 通过将复制节点所处理过的log中最大的时间戳,作为复制节点的唯一ID,这样,时间戳结合log,就可以唯一地表达此节点的整个状态。 应用这种方法的方式也很多:
理论上来讲,我们可以记录一系列的机器指令,或者所调用方法的名称及参数,只要数据处理进程的行为相同,这些进程就可以保证跨节点的一致性。
对分布式系统,通常有两种方式来处理复制和数据处理:
如下图所示: 为了理解上述两种方式的不同,来看个简单的例子:
上面的例子也揭示了,为什么顺序是复制节点之间保持一致性的关键因素,如果打乱了这些操作的顺序,就会得到不同的运算结果。
一条log,表征了一系列的关于下一个值是什么的决定。 2.2.4 Changelog 从数据库的角度来看,一组记录数据变化的changelog和表,是对偶和互通的。
这正是你想要的准实时复制的秘籍所在! 这一点和版本控制所做的事情极为类似:管理分布式的、并发的、对状态进行的修改。 版本控制工具,维护了反映修改的补丁,这其实就是log,你和一个被签出(checked out)的分支快照进行交互,这份快照就相当于数据库中的表。你会发现,版本控制与分布式系统中,复制都是基于log的:当你更新版本时,你只是拉取了反映了版本变化的补丁,并应用于当前的分支快照。 2.3 数据集成(Data integration) 2.3.1 数据集成的含义 所谓数据集成,就是将一个组织中的所有服务和系统的数据,变得可用。 实际上,对数据进行有效利用,很符合马斯洛的层次需求理论。
2.3.2 数据集成的两个复杂性 事件 事件数据,记录了事件是怎么发生的,而不仅仅是发生了什么,这一类log通常被当做应用日志,因为一般是由应用系统写入的。但这一点,其实混淆了log的功能。
各种各样的专业数据系统的爆发 这些系统存在的原因:
显然,要将数据整合进这样的系统中,对于数据集成来讲,极为困难。 2.3.3 基于日志结构的数据流 每种逻辑意义上的数据源,都可以依据log进行建模。 数据源可以是记录了事件(点击和PV)的应用程序,可以是接受更改的数据库表。 每个订阅者,都尽可能快地从这些数据源产生的log中获取新的记录,应用于本地的存储系统,并且提升其在log中的读取偏移(offset)。订阅者可以是任何数据系统,比如缓存、Hadoop、另一个站点的数据库,或者搜索引擎。 Log,实际上提供了一种逻辑时钟,针对数据变化,可以测量不同的订阅者所处的状态,因为这些订阅者在log中的读取偏移不同且相互独立,这种偏移就像一个时间意义上的“时刻”一样。 考虑这样一个例子,一个数据库,和一些缓存服务器:
假设我们写入了一个编号为X的log,要从某个缓存服务器读取数据,为了不读到老数据,只需要保证:在缓存服务器将数据(同步)复制到X这个位置前,我们不从这个缓存中读取任何东西即可。 此外,log还提供了作为缓冲区的能力,以支持生产者和消费者的行为以异步的方式进行。 最关键的一个支持异步的原因,是订阅系统可能会发生崩溃、因维护而下线,接着恢复上线,而在这种情况下,每个订阅者都以自己的步调消费数据。 一个批处理系统,比如Hadoop,或者一个数据仓库,是以小时或天为单位消费数据,而一个实时系统,通常在秒级消费数据。
更重要的是,订阅者,只需要知道log,而不需要对其所消费的数据的来源有任何了解,无论这个数据源是RDBMS、Hadoop,还是一个最新流行的K-V数据库,等等。 之所以讨论log,而不是消息系统,是因为不同的消息系统所保证的特性不同,并且用消息系统这个词,难以全面和精确表达某种语义,因为消息系统,更重要的在于重定向消息。 但是,可以将log理解为这样一种消息系统,其提供了持久性保证及强有序的语义,在通讯系统中,这称作原子广播。 2.4 在Linkedin Linkedin目前的主要系统包括(注:2013年):
每个系统,都在其专业的领域提供专门的高级功能。 (这一段太长太长了,Jay兄十分能侃啊,所以挑重点的来记吧!) 1) 之所以引入数据流这个概念,是因为要在oracle数据库的表之上,建立一个抽象的缓存层,为搜索引擎的索引构建和社交图谱更新,提供拓展能力。 2) 为了更好的处理linkedin的一些推荐算法,开始搭Hadoop集群,但团队在此块的经验尚浅,所以走了很多弯路。 3) 开始时,简单粗暴地认为只要将数据从oracle数据仓库中拉出来,丢进hadoop就可以了。结果发现:第一,将数据从oracle数据仓库快速导出是个噩梦;第二,也是更糟糕的一点,数据仓库中某些数据的处理不对,导致了hadoop的批处理任务不能按预期输出结果,且通过hadoop批处理执行任务,通常不可逆,特别是在出了报表之后。 4) 最后,团队抛弃了从数据仓库中出数据的方式,直接以数据库和logs为数据源。接着,造出了一个轮子:K-V 存储(Voldemort)。 5) 即使是数据拷贝这样不高大上的活儿,也占据了团队大量的时间去处理,更糟的是,一旦数据处理的pipeline中有个点出错,hadoop立马变得废柴,因为再牛逼的算法跑在错误的数据上,只有一个后果,就是产生更多的错误数据。 6) 即使团队构建的东西抽象层次很高,针对每种数据源还是需要特定的配置,而这也是很多错误和失败的根源。 7) 一大批程序员想跟进,每个程序员都有一大批的想法,集成这个系统,添加这个功能,整合这个特色,或者想要自定义的数据源。 8) Jay哥开始意识到:
第二, 很明显,可靠地数据装载需要更坚实的支撑,如果能够捕获所有的结构,就可以让hadoop数据装载完全自动化,不需要加入新的数据源或人工修改数据的模式。数据会神奇地出现在HDFS中,而新的数据源加入后,Hive的表会用合适的列自动化地、自适应地生成。 第三,数据覆盖度远远不足。因为要处理很多新的数据源,很难。 9) 为了解决新数据源加入后的数据装载问题,团队开始了这样的尝试: 很快,他们发现这样搞行不通,因为发布和订阅、生产和消费,数据流通常还是双向的,这成了一个O(n^2)的问题。
需要将每个消费者从数据源隔离,理想的情况下,这些消费者只和一个data repository进行交互,而这个repository可以提供它们访问任意数据的能力。 10)消息系统 + log = Kafka,kafka横空出世。 2.5 Log和ETL、数据仓库的关系
1) 一个装有干净的、结构化的、集成的数据repository,用于分析。
数据仓库,应该是一组查询功能的集合,这些功能服务于报表、搜索、ad hot 分析,包含了计数(counting)、聚合(aggregation)、过滤(filtering)等操作,所以更应该是一个批处理系统。 但是将干净的数据和这样的一种批处理系统高度耦合在一起,意味着这些数据不能被实时系统消费,比如搜索引擎的索引构建、实时计算和实时监控系统,等等。 2.5.2 ETL Jay哥认为,ETL无非做两件事: 1) 对数据进行抽取和清洗,将数据从特定的系统中解锁
但是,将这两件事耦合在一起,问题很大,因为集成后的、干净的数据,本应能被其它实时系统、索引构建系统、低延时的处理系统消费。 数据仓库团队,负责收集和清洗数据,但是,这些数据的生产者往往因为不明确数据仓库团队的数据处理需求,导致输出很难被抽取和清洗的数据。
所以,更好的方式是: 如果想在一个干净的数据集上做点搜索、实时监控趋势图、实时报警的事儿,以原有的数据仓库或者hadoop集群来作为基础设施,都是不合适的。更糟的是,ETL所构建的针对数据仓库的数据加载系统,对其它(实时)系统点儿用没有。 最好的模型,就是在数据发布者发布数据之前,就已经完成了数据的清洗过程,因为只有发布者最清楚它们的数据是什么样的。而所有在这个阶段所做的操作,都应该满足无损和可逆。 所有丰富语义、或添加值的实时转换,都应在原始的log发布后处理(post-processing),包括为事件数据建立会话,或者添加某些感兴趣的字段。原始的log依旧可被单独使用,但是此类实时应用也派生了新的参数化的log。 最后,只有对应于具体的目标系统的数据聚合操作,应作为数据装载的一部分,比如转换为星型或雪花型模式,以在数据仓库中进行分析和出报表。因为这个阶段,就像传统的ETL所做的那样,因为有了非常干净和规范的数据流,(有了log后)现在变得非常简单。 2.6 Log文件和事件 以log为核心的架构,还有个额外的好处,就是易于实现无耦合的、事件驱动的系统。 传统的 捕获用户活动和系统变化的方式,是将此类信息写入文本日志,然后抽取到数据仓库或者hadoop集群中进行聚合和处理,这个问题和前面所述的数据仓库和ETL问题类似:数据与数据仓库的高度耦合。 在Linkedin,其基于kafka构建了事件数据处理系统。为各种各样的action定义了成百上千种事件类型,从PV、用户对于广告的赶脚(ad impressions)、搜索,到服务的调用和应用的异常,等等。 为了体会上述事件驱动系统的好处,看一个简单的关于事件的例子: 比如,我们希望集成以下系统功能:
很快,仅仅展示机会的页面逻辑,就会变得复杂。当我们在移动端也增加了此机会的展示时,不得不把逻辑也迁移过去,这又加剧了复杂程度。还没完,纠结的东西是,负责处理此页面的工程师,需要有其它系统的知识,以确保上述的那些功能能正确的集成在一起。 这只是个极其简单的例子,在实践中,情况只会更加复杂。
负责呈现机会的页面,只需要呈现机会并记录一些和呈现相关的因素,比如工作机会的相关属性,谁浏览了这个页面,以及其它的有用的与呈现相关的信息。页面不需要保持对其它系统的知识和了解,比如推荐系统、安全系统、机会发布者的分析系统,还有数据仓库,所有的这些系统只需要作为订阅者,订阅这个事件,然后独立地进行它们各自的处理即可,而呈现机会的页面不需要因为新的订阅者或消费者的加入而做出修改。 2.7 构建可扩展的log 分离发布者和订阅者不新鲜,但是要保证多个订阅者能够实时处理消息,并且同时保证扩展能力,对于log系统来说,是一件比较困难的事。 如果log的构建不具备快速、低开销和可扩展能力,那么建立在此log系统之上的一切美好都免谈。 很多人可能认为log系统在分布式系统中是个很慢、重型开销的活儿,并且仅用来处理一些类似于ZooKeeper更适合处理的元数据等信息。 但是Linkedin现在(注:2013年),在kafka中每天处理600亿条不同的消息写入(如果算数据中心的镜像的话,那就是几千亿条写入)。 Jay哥他们怎么做到的呢? 1) 对log进行分割(partitioning the log)
通过将log切为多个partition来提供扩展能力: 1) 每个partition都是有序的log,但是partitions之间没有全局的顺序。 2) 将消息写入哪个partition完全由写入者控制,通过依照某种类型的key(如user_id)进行分割。 3) 分割使得log的附加操作,可以不用在分片(sharding)之间进行协调就进行,同时,保证系统的吞吐量和kafka集群的规模呈线性关系。 4) 虽然没有提供全局顺序(实际上消费者或者订阅者成千上万,讨论它们的全局顺序一般没有啥价值),但是kafka提供了这样一种保证:发送者按照什么顺序将消息发给某个partition,从这个partition递交出去的消息就是什么顺序(什么顺序进,什么顺序出)。 5) 每个partition都按照配置好的数目进行复制,如果一个leader节点挂了,其它的节点会成为新的leader。 6) 一条log,同文件系统一样,线性的读写模式可被优化,将小的读写log可以组成更大的、高吞吐量的操作。Kafka在这件事上做的很猛。批处理用在了各种场景之下,比如客户端将数据发送到服务端、将数据写入磁盘、服务器之间的数据复制、将数据传送给消费者,以及确认提交数据等场景。 7) 最后,kafka在内存log、磁盘log、网络中发送的log上,采用了很简单的二进制格式,以利于利用各种优化技术,比如零拷贝数据传输技术(zero-copy data transfer)。 诸多的优化技术,汇聚起来,可以让你即使在内存爆满的情形下,也能按照磁盘或网络能提供的最大能力进行数据读写。 2.8 Logs和实时处理 你以为Jay哥提供了这么个美丽的方法把数据复制来复制去就完了? Log是流的另一种说法,logs是流处理的核心。 2.8.1 什么是流处理 Jay哥认为:
在linkedin,无论是活动数据还是数据库的变化,都是连续的。
所以,流处理是这样一种过程:
8)许多商业公司无法建立流处理引擎,往往因为无法建立流数据收集引擎。
2.9 数据流图谱 流处理中最有趣的地方在于,其拓展了什么是数据源(feeds)这一概念。
一个流处理任务,应该是这样的:从logs读取数据,将输出写入logs或者其它系统。 作为输入和输出的logs,连通这些处理本身,和其它的处理过程,构成了一个图。 事实上,以log为核心的系统,允许你将公司或机构中的数据捕获、转换以及数据流,看作是一系列的logs及在其上进行写入的处理过程的结合。 一个流处理程序,其实不必很高大上:可以是一个处理过程或者一组处理过程,但是,为了便于管理处理所用的代码,可以提供一些额外的基础设施和支持。 引入logs有两个目的: 1) 保证了数据集可以支持多个订阅者模式,及有序。
Storm和Samza就是这样的流处理引擎,并且都能用kafka或其它类似的系统作为它们的log系统。 (注:Jay哥相当猛,前有kafka,后有samza。) 2.10 有状态的实时处理 很多流处理引擎是无状态的、一次一记录的形式,但很多用例都需要在流处理的某个大小的时间窗口内进行复杂的counts , aggregations和joins操作。
那么,这种用例,就需要状态的支持。在处理数据的地方,需要维护某个数据的状态。 问题在于,如何在处理者可能挂掉的情况下保持正确的状态? 将状态维护在内存中可能是最简单的,但抵不住crash。 如果仅在某个时间窗口内维护状态,当挂掉或者失败发生,那么处理可以直接回退到窗口的起点来重放,但是,如果这个窗口有1小时那么长,这可能行不通。 还有个简单的办法,就是把状态存在某个远程的存储系统或数据库中,但是这会损失数据的局部性并产生很多的网络间数据往返(network round-trip)。 回忆下,上文中曾提到的数据库中的表和log的对偶性。
通过记录关于本地索引的changelog,用于在crash后恢复状态。这种机制,其实也揭示了一种一般化的,可以存储为任意索引类型的,与输入流同时被分割(co-partitioned)的状态。 当处理进程崩溃,其可以从changelog中恢复索引,log充当了将本地状态转化为某种基于时间备份的增量记录的角色。 这种机制还提供了一种很优雅的能力:处理过程本身的状态也可以作为log被记录下来,显然,其它的处理过程可以订阅这个状态。 结合数据库中的log技术,针对数据集成这一场景,往往可以做出很强大的事: 将log从数据库中抽取出来,并在各种各样的流处理系统中进行索引,那么,与不同的事件流进行join就成为可能。 2.11 Log 合并 显然,用log记录全时全量的状态变更信息,不太可能。 Kafka使用了log合并或者log垃圾回收技术: 1) 对于事件数据,kafka只保留一个时间窗口(可在时间上配置为几天,或者按空间来配置)
如果保持全时全量的logs,随着时间增长,数据将会变得越来越大,重放的过程也会越来越长。
2.12 系统构建 2.12.1 分布式系统 Log,在分布式数据库的数据流系统和数据集成中所扮演的角色是一致的:
你可以将整个机构中的应用系统和数据流,看作是一个单独的分布式数据库。
各式各样的数据系统,爆发性的出现,其实,这种复杂性早已存在。 显然,不可能将所有的东西都丢进一个Hadoop集群中,期望其解决所有的问题。所以,如何构建一个好的系统,可能会像下面这样: 构建一个分布式系统,每个组件都是一些很小的集群,每个集群不一定能完整提供安全性、性能隔离、或者良好的扩展性,但是,每个问题都能得到(专业地)解决。 Jay哥觉得,之所以各式各样的系统爆发性地出现,就是因为要构建一个强大的分布式系统十分困难。而如果将用例限制到一些简单的,比如查询这样的场景下,每个系统都有足够的能力去解决问题,但是要把这些系统整合起来,很难。 Jay哥觉得在未来构建系统这事儿有三种可能: 1) 保持现状。这种情况下,数据集成依然是最头大的问题,所以一个外部的log系统就很重要(kafka!)
从某种角度来看,构建这样的分布式系统,就像某个版本的乐高积木一样。这显然跟更关心API的终端用户没有太大关系,但是这揭示了构建一个强大系统并保持简单性的一条道路:
2.12.2 Log在系统构建中的地位 如果一个系统,有了外部log系统的支持,那么每个独立的系统就可以通过共享log来降低其自身的复杂性,Jay哥认为log的作用是: 1) 处理数据一致性问题。无论是立即一致性还是最终一致性,都可以通过序列化对于节点的并发操作来达到。 2) 在节点间提供数据复制。 3) 提供“提交”的语义。比如,在你认为你的写操作不会丢失的情况下进行操作确认。 4) 提供外部系统可订阅的数据源(feeds)。 5) 当节点因失败而丢失数据时,提供恢复的能力,或者重新构建新的复制节点。 6) 处理节点间的负载均衡。 以上,大概是一个完整的分布式系统中应提供的大部分功能了(Jay哥确实爱Log!),剩下的就是客户端的API和诸如一些构建索引的事了,比如全文索引需要获取所有的partitions,而针对主键的查询,只需要在某个partition中获取数据。 (那把剩下的事情也交代下吧,Jay哥威武!) 系统可被分为两个逻辑组件(这强大的理解和功力): 1) Log层
Log层,以序列化的、有序的方式捕获状态的变化,而服务层,则存储外部查询需要的索引,比如一个K-V存储可能需要B-tree、sstable索引,而一个搜索服务需要倒排索引。 写操作既可以直接入log层,也可以通过服务层做代理。写入log会产生一个逻辑上的时间戳(log的索引),比如一个数字ID,如果系统partition化了,那么,服务层和log层会拥有相同的partitions(但其各自的机器数可能不同)。 服务层订阅到log层,并且以最快的速度、按log存储的顺序追log,将数据和状态变化同步进自己的本地索引中。 客户端将会得到read-your-write的语义: 通过对任一一个节点,在查询时携带其写入时的时间戳,服务层的节点收到此查询,通过和其本地索引比较时间戳,如果必要,为了防止返回过期的老数据,推迟请求的执行,直到此服务节点的索引同步跟上了时间戳。 服务层的节点,也许需要、也许不需要知道leader的概念。在很多简单的用例中,服务层可不构建leader节点,因为log就是事实的来源。 还有一个问题,如何处理节点失败后的恢复问题。可以这样做,在log中保留一个固定大小的时间窗口,同时对数据维护快照。也可以让log保留数据的全量备份并使用log合并技术完成log自身的垃圾回收。这种方法,将服务层的众多复杂性移至log层,因为服务层是系统相关(system-specific)的,而log层确可以通用。 基于log系统,可以提供一组完备的、供开发使用的、可作为其它系统的ETL数据源、并供其它系统订阅的API。 Full Stack !: 显然,一个以log为核心的分布式系统,其本身立即成为了可对其它系统提供数据装载支持及数据流处理的角色。同样的,一个流处理系统,也可以同时消费多个数据流,并通过对这些数据流进行索引然后输出的另一个系统,来对外提供服务。 基于log层和服务层来构建系统,使得查询相关的因素与系统的可用性、一致性等因素解耦。 也许很多人认为在log中维护数据的单独备份,特别是做全量数据拷贝太浪费、太奢侈,但事实并非如此: 1) linkedin(注:2013年)的kafka生产集群维护了每数据中心75TB的数据,而应用集群需要的存储空间和存储条件(SSD+更多的内存)比kafka集群要高。
2.13 结语 Jay哥在最后,不仅厚道地留下了很多学术、工程上的有价值的论文和参考链接,还很谦逊地留下了这句话: If you made it this far you know most of what I know about logs. 终。 |
|
来自: 昵称28748685 > 《待分类》