在上一篇中介绍了SolrCloud的第一个模块---构建管理solr集群状态信息的zookeeper集群。当我们在solr服务器启动时拥有了这样一个Zookeeper集群后,显然我们需要连接到Zookeeper集群的方便手段,在这一篇中我将对Zookeeper客户端相关的各个封装类进行分析。 SolrZkClient类是Solr服务器用来与Zookeeper集群进行通信的接口类,它包含的主要组件有: private ConnectionManager connManager; private volatile SolrZooKeeper keeper; private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(); 其中ConnectionManager是Watcher的实现类,主要负责对客户端与Zookeeper集群之间连接的状态变化信息进行响应,关于Watcher的详细介绍,可以参考http://zookeeper./doc/trunk/zookeeperProgrammers.html#ch_zkWatches, SolrZooKeeper类是一个包装类,没有实际意义,ZkCmdExecutor类是负责在连接失败的情况下,重试某种操作特定次数,具体的操作是ZkOperation这个抽象类的具体实现子类,其execute方法中包含了具体操作步骤,这些操作包括新建一个Znode节点,读取Znode节点数据,创建Znode路径,删除Znode节点等Zookeeper操作。 首先来看它的构造函数,先创建ConnectionManager对象来响应两端之间的状态变化信息,然后ZkClientConnectionStrategy类是一个连接策略抽象类,它包含连接和重连两种策略,并且采用模板方法模式,具体的实现是通过静态累不类ZkUpdate来实现的,DefaultConnectionStrategy是它的一个实现子类,它覆写了connect和reconnect两个连接策略方法。 public SolrZkClient(String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException, TimeoutException, IOException { connManager = new ConnectionManager("ZooKeeperConnection Watcher:" + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect); strat.connect(zkServerAddress, zkClientTimeout, connManager, new ZkUpdate() { @Override public void update(SolrZooKeeper zooKeeper) { SolrZooKeeper oldKeeper = keeper; keeper = zooKeeper; if (oldKeeper != null) { try { oldKeeper.close(); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } } } }); connManager.waitForConnected(clientConnectTimeout); numOpens.incrementAndGet(); } 值得注意的是,构造函数中生成的ZkUpdate匿名类对象,它的update方法会被调用, 在这个方法里,会首先将已有的老的SolrZooKeeperg关闭掉,然后放置上一个新的SolrZooKeeper。做好这些准备工作以后,就会去连接Zookeeper服务器集群, connManager.waitForConnected(clientConnectTimeout);//连接zk服务器集群,默认30秒超时时间 其实具体的连接动作是new SolrZooKeeper(serverAddress, timeout, watcher)引发的,上面那句代码只是在等待指定时间,看是否已经连接上。 如果连接Zookeeper服务器集群成功,那么就可以进行Zookeeper的常规操作了: 1) 是否已经连接 public boolean isConnected() { return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED; } 2) 是否存在某个路径的Znode public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException { if (retryOnConnLoss) { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public Stat execute() throws KeeperException, InterruptedException { return keeper.exists(path, watcher); } }); } else { return keeper.exists(path, watcher); } } 3) 创建一个Znode节点 public String create(final String path, final byte data[], final List<ACL> acl, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException { if (retryOnConnLoss) { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public String execute() throws KeeperException, InterruptedException { return keeper.create(path, data, acl, createMode); } }); } else { return keeper.create(path, data, acl, createMode); } } 4) 获取指定路径下的孩子Znode节点 public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException { if (retryOnConnLoss) { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public List<String> execute() throws KeeperException, InterruptedException { return keeper.getChildren(path, watcher); } }); } else { return keeper.getChildren(path, watcher); } } 5) 获取指定Znode上附加的数据 public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss) throws KeeperException, InterruptedException { if (retryOnConnLoss) { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public byte[] execute() throws KeeperException, InterruptedException { return keeper.getData(path, watcher, stat); } }); } else { return keeper.getData(path, watcher, stat); } } 6) 在指定Znode上设置数据 public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException { if (retryOnConnLoss) { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public Stat execute() throws KeeperException, InterruptedException { return keeper.setData(path, data, version); } }); } else { return keeper.setData(path, data, version); } } 7) 创建路径 public void makePath(String path, byte[] data, CreateMode createMode, Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException { if (log.isInfoEnabled()) { log.info("makePath: " + path); } boolean retry = true; if (path.startsWith("/")) { path = path.substring(1, path.length()); } String[] paths = path.split("/"); StringBuilder sbPath = new StringBuilder(); for (int i = 0; i < paths.length; i++) { byte[] bytes = null; String pathPiece = paths[i]; sbPath.append("/" + pathPiece); final String currentPath = sbPath.toString(); Object exists = exists(currentPath, watcher, retryOnConnLoss); if (exists == null || ((i == paths.length -1) && failOnExists)) { CreateMode mode = CreateMode.PERSISTENT; if (i == paths.length - 1) { mode = createMode; bytes = data; if (!retryOnConnLoss) retry = false; } try { if (retry) { final CreateMode finalMode = mode; final byte[] finalBytes = bytes; zkCmdExecutor.retryOperation(new ZkOperation() { @Override public Object execute() throws KeeperException, InterruptedException { keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode); return null; } }); } else { keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode); } } catch (NodeExistsException e) { if (!failOnExists) { // TODO: version ? for now, don't worry about race setData(currentPath, data, -1, retryOnConnLoss); // set new watch exists(currentPath, watcher, retryOnConnLoss); return; } // ignore unless it's the last node in the path if (i == paths.length - 1) { throw e; } } if(i == paths.length -1) { // set new watch exists(currentPath, watcher, retryOnConnLoss); } } else if (i == paths.length - 1) { // TODO: version ? for now, don't worry about race setData(currentPath, data, -1, retryOnConnLoss); // set new watch exists(currentPath, watcher, retryOnConnLoss); } } } 8) 删除指定Znode public void delete(final String path, final int version, boolean retryOnConnLoss) throws InterruptedException, KeeperException { if (retryOnConnLoss) { zkCmdExecutor.retryOperation(new ZkOperation() { @Override public Stat execute() throws KeeperException, InterruptedException { keeper.delete(path, version); return null; } }); } else { keeper.delete(path, version); } } 我们再回过头来看看ConnectionManager类是如何响应两端的连接状态信息的变化的,它最重要的方法是process方法,当它被触发回调时,会从WatchedEvent参数中得到事件的各种状态信息,比如连接成功,会话过期(此时需要进行重连),连接断开等。 public synchronized void process(WatchedEvent event) { if (log.isInfoEnabled()) { log.info("Watcher " + this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType()); } state = event.getState(); if (state == KeeperState.SyncConnected) { connected = true; clientConnected.countDown(); } else if (state == KeeperState.Expired) { connected = false; log.info("Attempting to reconnect to recover relationship with ZooKeeper..."); //尝试重新连接zk服务器 try { connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() { @Override public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException { synchronized (connectionStrategy) { waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT); client.updateKeeper(keeper); if (onReconnect != null) { onReconnect.command(); } synchronized (ConnectionManager.this) { ConnectionManager.this.connected = true; } } } }); } catch (Exception e) { SolrException.log(log, "", e); } log.info("Connected:" + connected); } else if (state == KeeperState.Disconnected) { connected = false; } else { connected = false; } notifyAll(); }
|
|