Hadoop的设计初衷是服务于off-line的数据存储和处理应用。随着这个产品的不断成熟和发展,对于支持on-line应用的需求越来越强烈。例如HBase已经被Facebook和淘宝用到了在线存储应用中。所以Hadoop的on-line化也是一个趋势。目前制约Hadoop作为on-line存储和处理的瓶颈主要是系统的availability。衡量一个分布式系统的主要指标有:reliability, availability & scalability。Hadoop可以做到横向扩展,所以scalability非常好;而用户存在Hadoop里的数据几乎不会丢失,所以reliability也是非常不错的;目前的主要问题在availability,也就是用户向HDFS集群请求数据的时候集群是否能够保证100%提供服务,目前的主要问题体现在HDFS的SPOF(single point of failure),整个HDFS集群的启动/重启时间非常长,配置参数无法动态更改等。这些方面都是apache社区目前工作的重点,本文主要讨论HDFS NameNode的SPOF问题相关的HA机制。 Hadoop目前的trunk中的代码已经merge了原来的ha-branch,所以现在的trunk中的代码已经实现了基本的HA机制的功能。Hadoop PMC的人表示将会在后面的版本中发布这个功能。下面这张图是目前的HDFS HA的实现逻辑。 Right now the HA branch supports HOT-Failover, except that it is manual failover. We are now moving into a phase to implement automatic failover. Significant enhancements were completed to make HOT Failover work: 这是Hadoop mailing list中关于目前HA现状的阐述。下面首先简单介绍下这5个方面是怎么实现的,后面从源代码的角度分析具体的实现细节。 (1) Configuration changes for HA 在配置文件中会增加关于HA配置的参数,具体参数配置可以参考CDH4 Beta 2 High Availability Guide,这里介绍一些比较重要的参数。 例如dfs.ha.namenodes.[nameservice ID]这个参数表示在[nameservice ID]这个nameservice下的两台NameNode(分别作为Active和Standby模式运行)的主机名。然后针对每一台NN配置其对应的dfs.namenode.rpc-address.[nameservice ID].[name node ID]用来标示每一台NN。 由于目前的两台主机之间的HA机制是通过一个共享存储来存放editlog来实现的。所以需要配置参数dfs.namenode.shared.edits.dir表示共享存储的位置,一般是通过NFS挂载的形式,所以其实这个参数的值就是一个本地文件系统中的目录。 dfs.client.failover.proxy.provider.[nameservice ID]这个参数指定具体的failover proxy provider类,也就是在client端发现原来Active的NameNode变成了Standby模式时(在client发送RPC请求时返回了StandbyException时),该如何去连接当前Active的NameNode。目前的Hadoop里只有一个具体实现策略ConfiguredFailoverProxyProvider,实现方法就是如果client failover时,下次把RPC发送给另外一个NameNode的proxy。 另外就是dfs.ha.fencing.methods参数,指定在Active NameNode切换到Standby模式时,确保切换成功或者进程被杀死。 有两种模式的NameNode,分别是Active和Standby模式。Active模式的NameNode接受client的RPC请求并处理,同时写自己的Editlog和共享存储上的Editlog,接收DataNode的Block report, block location updates和heartbeat;Standby模式的NameNode同样会接到来自DataNode的Block report, block location updates和heartbeat,同时会从共享存储的Editlog上读取并执行这些log操作,使得自己的NameNode中的元数据(Namespcae information + Block locations map)都是和Active NameNode中的元数据是同步的。所以说Standby模式的NameNode是一个热备(Hot Standby NameNode),一旦切换成Active模式,马上就可以提供NameNode服务。 (3) Client-side redirection Client的通过RPC的Proxy与NameNode交互。在client端会有两个代理同时存在,分别代表与Active和Standby的NameNode的连接。由于Client端有Retry机制,当与Active NameNode正常通信的client proxy收到RPC返回的StandbyException时,说明这个Active NameNode已经变成了Standby模式,所以触发dfs.client.failover.proxy.provider.[nameservice ID]这个参数指定的类来做failover,目前唯一的实现是ConfiguredFailoverProxyProvider,实现方法就是下次开始把RPC发向另外一个NameNode。此后的RPC都是发往另外一个NameNode,也就是NameNode发生了主从切换。 public synchronized void performFailover(T currentProxy) { currentProxyIndex = (currentProxyIndex + 1 ) % proxies.size(); } (4) Standby processing editlogs form Active 开启Standby模式后,Standby NameNode会通过EditLogTailerThread从共享存储中读取Active NameNode写到那里的Editlog,然后执行操作,从而保持自己的元数据是最新的,所以说是热备。 (5)Dual block reports to Active and Standby. DataNode的Block report, block location updates和heartbeat等RPC操作会发向两个NameNode,从而使得两个NameNode的Block locations map都是最新的,这样可以做到切换主从后原来的从(新的主)不再需要block report的时间。 可以看出client与NameNode之间的RPC是只向一个NameNode发送的(收到StandbyException后才会重试另外一个);而DataNode与NameNode之间的RPC在任何时候都是同时向两个NameNode发送的。 上面分析了Hadoop High Availability的思路和主要功能,下面文章中从代码的角度分析具体的实现。 (1)NameNode启动流程 对于HDFS HA机制来说,NameNode是核心,NameNode有Active和Standby两种状态。在NameNode的构造函数中,读取配置文件,如果配置文件配置了开启HA,那么NameNode进入STANDBY_STATE状态;反之则进入ACTIVE_STATE状态。 this .haEnabled = HAUtil.isHAEnabled(conf, nsId); if (!haEnabled) { state = ACTIVE_STATE; } else { state = STANDBY_STATE; } 创建HA上下文,NameNodeHAContext类包含了NameNode的Active和Standby模式变换相关操作的函数实现。 this .haContext = createHAContext(); 然后就是初始化操作,包括配置参数,RPC server,metrics,加载Namespace,然后进入当前的haContext try { initializeGenericKeys(conf, nsId, namenodeId); initialize(conf); state.prepareToEnterState(haContext); state.enterState(haContext); } catch (IOException e) { this .stop(); throw e; } catch (HadoopIllegalArgumentException e) { this .stop(); throw e; } (2)管理员执行HA管理命令流程 当两个NameNode都已经启动并进入Standby模式之后,就可以通过bin/hdfs脚本执行HDFS管理功能,执行如下命令: bin /hdfs haadmin 就会调用DFSHAAdmin.java这个类来执行用户指定的功能,例如: bin /hdfs haadmin -transitionToActive serviceId bin /hdfs haadmin -transitionToStandby serviceId bin /hdfs haadmin -failover serviceId serviceId 以-failover为例,就会调用HAAdmin.java类中的failover方法。 private int failover( final String[] argv) throws IOException, ServiceFailedException { boolean forceFence = false ; boolean forceActive = false ; Options failoverOpts = new Options(); failoverOpts.addOption(“failover”, false , “failover”); failoverOpts.addOption(FORCEFENCE, false , “force fencing”); failoverOpts.addOption(FORCEACTIVE, false , “force failover”); CommandLineParser parser = new GnuParser(); CommandLine cmd; try { cmd = parser.parse(failoverOpts, argv); forceFence = cmd.hasOption(FORCEFENCE); forceActive = cmd.hasOption(FORCEACTIVE); } catch (ParseException pe) { errOut.println(“failover: incorrect arguments”); printUsage(errOut, ??-failover”); return - 1 ; } int numOpts = cmd.getOptions() == null ? 0 : cmd.getOptions().length; final String[] args = cmd.getArgs(); if (numOpts > 2 || args.length != 2 ) { errOut.println(“failover: incorrect arguments”); printUsage(errOut, “-failover”); return - 1 ; } HAServiceTarget fromNode = resolveTarget(args[ 0 ]); HAServiceTarget toNode = resolveTarget(args[ 1 ]); FailoverController fc = new FailoverController(getConf()); try { fc.failover(fromNode, toNode, forceFence, forceActive); out.println(“Failover from “+args[ 0 ]+” to “+args[ 1 ]+” successful”); } catch (FailoverFailedException ffe) { errOut.println(“Failover failed: ” + ffe.getLocalizedMessage()); return - 1 ; } return 0 ; } 在这个函数中首先解析参数,然后会生成两个HAServiceTarget,分别表示发生主从切换的两个NameNode。由于这个DFSHAAdmin命令可以在任何一台可以连接到集群中的机器上运行,所以HAServiceTarget实际上是发生主从切换的两个NameNode的代理的封装。这个代理与两个NameNode通信的RPC协议时HAServiceProtocol。目前的Hadoop的RPC已经默认了Protocol Buffer作为RPC的实现。 然后生成FailoverController对象,这个类就是用于控制主从切换的。然后执行这个类中的failover方法。 public void failover(HAServiceTarget fromSvc, HAServiceTarget toSvc, boolean forceFence, boolean forceActive) throws FailoverFailedException { Preconditions.checkArgument(fromSvc.getFencer() != null , “failover requires a fencer”); //强制需要一种fencing方法 // Failover前的检查,例如fromSvc和toSvc是不是同一NameNode,toSvc是不是已经处于Active状态等 preFailoverChecks(fromSvc, toSvc, forceActive); // 第一步是先把fromSvc转换成standby模式 boolean tryFence = true ; // 通过向fromSvc的发送HAServiceProtocol的方式使得fromSvc transition to Standby mode // 如果这个RPC返回的结果是ServiceFailedException或者IOException, // 那么说明transition fail,从而是否要tryFence就是true,就必须fencing了。 if (tryGracefulFence(fromSvc)) { tryFence = forceFence; } // Fence fromSvc if it’s required or forced by the user if (tryFence) { if (!fromSvc.getFencer().fence(fromSvc)) { throw new FailoverFailedException(“Unable to fence ” + fromSvc + “. Fencing failed.”); } } // 第二步就是让toSvc转换成active模式,操作方法和上面类似, // 通过RPC给toSvc发送transitionToActive命令。 boolean failed = false ; Throwable cause = null ; try { HAServiceProtocolHelper.transitionToActive( toSvc.getProxy(conf, rpcTimeoutToNewActive)); } catch (ServiceFailedException sfe) { LOG.error(“Unable to make ” + toSvc + ” active (” + sfe.getMessage() + “). Failing back.”); failed = true ; cause = sfe; } catch (IOException ioe) { LOG.error(“Unable to make ” + toSvc + ” active (unable to connect). Failing back.”, ioe); failed = true ; cause = ioe; } // 如果我们在第二步的时候,把toSvc转换成Active模式失败,需要考虑回滚。 // 如果我们没有强制fencing原来的fromSvc,那么就回滚。 // 如果我们强制fencing掉原来的fromSvc,那么只能抛异常了。 if (failed) { String msg = “Unable to failover to ” + toSvc; // Only try to failback if we didn’t fence fromSvc if (!tryFence) { try { // Unconditionally fence toSvc in case it is still trying to // become active, eg we timed out waiting for its response. // Unconditionally force fromSvc to become active since it // was previously active when we initiated failover. failover(toSvc, fromSvc, true , true ); } catch (FailoverFailedException ffe) { msg += “. Failback to ” + fromSvc + ” failed (” + ffe.getMessage() + “)”; LOG.fatal(msg); } } throw new FailoverFailedException(msg, cause); } (3)NameNode端的HA状态切换执行的操作代码 以上是DFSHAAdmin的操作,那么当它把对应的RPC命令发送到NameNode时,NameNode端的逻辑是怎么处理的呢?新的机遇Protocol Buffer实现的RPC请求在NameNode端会调用NameNodeRpcServer.java类中的方法。 @Override // HAServiceProtocol public synchronized void transitionToActive() throws ServiceFailedException, AccessControlException { nn.transitionToActive(); } @Override // HAServiceProtocol public synchronized void transitionToStandby() throws ServiceFailedException, AccessControlException { nn.transitionToStandby(); } @Override // HAServiceProtocol public synchronized HAServiceStatus getServiceStatus() throws AccessControlException, ServiceFailedException { return nn.getServiceStatus(); }
s.enterState(context)的调用流程类似,最终会走到FSNamesystem.startActiveServices()方法中。
|
|
来自: 关平藏书 > 《Hadoop家族》