1、需求背景
有一个集群(N台服务器),需要访问一个外部接口获取数据,但是每个时间点只需要一个机器来执行任务就行了,不需要所有服务器同时执行,因为集群共享数据库。
2、思路
通过Zookeeper选取连接序列号最小的服务器作为当前时间执行任务的机器,如果当前机器宕机,通过Zookeeper自动发现机制,重新选择新的执行机器,除非整个集群挂掉;
(当然,改思路可以让集群处理同一个任务集的不同部分,如N1处理1...N,N2处理N+1....N2等)
3、代码
//连接管理客户端public class ZkClientManager implements Watcher,InitializingBean { private Logger log = LoggerFactory.getLogger(getClass()); private final String ROOT_PATH = "/car_position"; private final String SUB_PATH = ROOT_PATH + "/sub_system_"; @Value("${zookeeper_address}") public String CONNECT_PATH = "127.0.0.1:2181"; private Integer SESSION_TIME_OUT = 10000; private ZooKeeper zk = null; private String SELF_PATH = null; private boolean createRootPath(String data) throws KeeperException, InterruptedException { if (zk.exists(ROOT_PATH, true) == null) { String rootpath = zk.create(ROOT_PATH, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); log.info("------#创建根节点成功rootpath:" + rootpath); } else { log.info("------#根路径节点已经存在,不在重复创建"); } return true; } private boolean createSubNode() throws KeeperException, InterruptedException { SELF_PATH = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); log.info("------#创建当前zk节点路径成功,path=" + SELF_PATH); Listpaths = zk.getChildren(ROOT_PATH, new ZkNodeWatcher(zk, SELF_PATH)); WangKuoGpsUtil.sortAndCalcPath(SELF_PATH,paths); return true; } private void deletePath() { try { zk.delete(SELF_PATH, -1); log.info("-------#删除PATH=" + SELF_PATH); } catch (Exception e) { log.error("------#删除zk节点路径异常",e); } } private void releaseConn() { if (zk != null) { try { zk.close(); } catch (InterruptedException e) { log.error("------#zookeeper关闭连接异常"); } } } public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); Event.EventType type = event.getType(); if (Event.KeeperState.SyncConnected == state) { if (Event.EventType.None == type) { log.info("------#成功连接上zk服务器"); } } else if (Event.KeeperState.Disconnected == state) { log.info("------#与ZK服务器断开连接"); } else if (Event.KeeperState.Expired == state) { log.info("会话失效"); } } @Override public void afterPropertiesSet() throws Exception { log.info("------#Zookeeper连接服务器地址{},timeout={}",CONNECT_PATH,SESSION_TIME_OUT); zk = new ZooKeeper(CONNECT_PATH, SESSION_TIME_OUT, this); log.info("------#初始化zookeeper连接成功"); createRootPath(""); createSubNode(); }}
//监听器,接收节点变更通知,收到通知后重新计算public class ZkNodeWatcher implements Watcher { private Logger log = LoggerFactory.getLogger(getClass()); private ZooKeeper zooKeeper; private String selfPath; public ZkNodeWatcher(ZooKeeper zooKeeper,String selfPath){ this.zooKeeper = zooKeeper; this.selfPath =selfPath; } public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); Event.EventType type = event.getType(); try { log.info("------#Zookeeper NodeWather,state={},type={}",state,type); if(Event.EventType.NodeChildrenChanged == type){ Listpaths = zooKeeper.getChildren(event.getPath(), this); WangKuoGpsUtil.sortAndCalcPath(selfPath,paths); } } catch (Exception e) { log.error("------#处理时间异常",e); } }}
//对服务器在Zookeeper上的需要进行排序,并计算出当前节点的排序位public class WangKuoGpsUtil { private static Logger log = LoggerFactory.getLogger(WangKuoGpsUtil.class); public static void sortAndCalcPath(String nodePath,Listpaths) { log.info(" ------#nodePath={} 监听节点--->{} ",nodePath,JSONObject.toJSON(paths)); Collections.sort(paths); GpsQueueCache.setNodeCount(paths.size()); GpsQueueCache.setLastTime(0L);//强制从数据库中拉取 for(int i=0;i
//缓存当前系统状态参数,用于业务计算public class GpsQueueCache { //最近一次从数据库拉取数据的时间 private static Long lastTime = 0L; //当前服务器节点顺序位 private static Integer nodeIndex=1; //当前服务器节点总量,可用于计算当前服务器处理得数据片段,根据节点偏移nodeIndex划分 private static Integer nodeCount=1;
说明:
1、客户端启动后连接Zookeeper,获取当前节点注册的序列号;
2、计算自己在集群中的当前位置;
3、根据位置信息做自己该做的事情。