博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Zookeeper实现集群中选择单机器执行任务并自动切换
阅读量:6657 次
发布时间:2019-06-25

本文共 4512 字,大约阅读时间需要 15 分钟。

hot3.png

1、需求背景

     有一个集群(N台服务器),需要访问一个外部接口获取数据,但是每个时间点只需要一个机器来执行任务就行了,不需要所有服务器同时执行,因为集群共享数据库。

2、思路

   通过Zookeeper选取连接序列号最小的服务器作为当前时间执行任务的机器,如果当前机器宕机,通过Zookeeper自动发现机制,重新选择新的执行机器,除非整个集群挂掉;

  (当然,改思路可以让集群处理同一个任务集的不同部分,如N1处理1...N,N2处理N+1....N2等)

3、代码

//连接管理客户端public class ZkClientManager implements Watcher,InitializingBean {    private Logger log = LoggerFactory.getLogger(getClass());    private final String ROOT_PATH = "/car_position";    private final String SUB_PATH = ROOT_PATH + "/sub_system_";    @Value("${zookeeper_address}")    public String CONNECT_PATH = "127.0.0.1:2181";    private Integer SESSION_TIME_OUT = 10000;    private ZooKeeper zk = null;    private String SELF_PATH = null;    private boolean createRootPath(String data) throws KeeperException, InterruptedException {        if (zk.exists(ROOT_PATH, true) == null) {            String rootpath = zk.create(ROOT_PATH, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            log.info("------#创建根节点成功rootpath:" + rootpath);        } else {            log.info("------#根路径节点已经存在,不在重复创建");        }        return true;    }    private boolean createSubNode() throws KeeperException, InterruptedException {        SELF_PATH = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);        log.info("------#创建当前zk节点路径成功,path=" + SELF_PATH);        List
paths = zk.getChildren(ROOT_PATH, new ZkNodeWatcher(zk, SELF_PATH)); WangKuoGpsUtil.sortAndCalcPath(SELF_PATH,paths); return true; } private void deletePath() { try { zk.delete(SELF_PATH, -1); log.info("-------#删除PATH=" + SELF_PATH); } catch (Exception e) { log.error("------#删除zk节点路径异常",e); } } private void releaseConn() { if (zk != null) { try { zk.close(); } catch (InterruptedException e) { log.error("------#zookeeper关闭连接异常"); } } } public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); Event.EventType type = event.getType(); if (Event.KeeperState.SyncConnected == state) { if (Event.EventType.None == type) { log.info("------#成功连接上zk服务器"); } } else if (Event.KeeperState.Disconnected == state) { log.info("------#与ZK服务器断开连接"); } else if (Event.KeeperState.Expired == state) { log.info("会话失效"); } } @Override public void afterPropertiesSet() throws Exception { log.info("------#Zookeeper连接服务器地址{},timeout={}",CONNECT_PATH,SESSION_TIME_OUT); zk = new ZooKeeper(CONNECT_PATH, SESSION_TIME_OUT, this); log.info("------#初始化zookeeper连接成功"); createRootPath(""); createSubNode(); }}
//监听器,接收节点变更通知,收到通知后重新计算public class ZkNodeWatcher implements Watcher {    private Logger log = LoggerFactory.getLogger(getClass());    private ZooKeeper zooKeeper;    private String selfPath;    public ZkNodeWatcher(ZooKeeper zooKeeper,String selfPath){        this.zooKeeper = zooKeeper;        this.selfPath =selfPath;    }    public void process(WatchedEvent event) {        Event.KeeperState state = event.getState();        Event.EventType type = event.getType();        try {            log.info("------#Zookeeper NodeWather,state={},type={}",state,type);            if(Event.EventType.NodeChildrenChanged == type){                List
paths = zooKeeper.getChildren(event.getPath(), this); WangKuoGpsUtil.sortAndCalcPath(selfPath,paths); } } catch (Exception e) { log.error("------#处理时间异常",e); } }}

 

//对服务器在Zookeeper上的需要进行排序,并计算出当前节点的排序位public class WangKuoGpsUtil {    private static Logger log = LoggerFactory.getLogger(WangKuoGpsUtil.class);     public static void sortAndCalcPath(String nodePath,List
paths) { log.info(" ------#nodePath={} 监听节点--->{} ",nodePath,JSONObject.toJSON(paths)); Collections.sort(paths); GpsQueueCache.setNodeCount(paths.size()); GpsQueueCache.setLastTime(0L);//强制从数据库中拉取 for(int i=0;i

 

//缓存当前系统状态参数,用于业务计算public class GpsQueueCache {    //最近一次从数据库拉取数据的时间    private  static Long lastTime = 0L;    //当前服务器节点顺序位    private static Integer nodeIndex=1;    //当前服务器节点总量,可用于计算当前服务器处理得数据片段,根据节点偏移nodeIndex划分    private static Integer nodeCount=1;

说明:

     1、客户端启动后连接Zookeeper,获取当前节点注册的序列号;

     2、计算自己在集群中的当前位置;

    3、根据位置信息做自己该做的事情。

 

 

转载于:https://my.oschina.net/u/1159254/blog/886637

你可能感兴趣的文章
Debian下搭建zabbix监控
查看>>
病毒与***的查杀
查看>>
线程组
查看>>
涉密数据的处理
查看>>
我的友情链接
查看>>
【单机实现系列】通过scom2012对Hyper-V主机来监控和邮件报警②
查看>>
python简介
查看>>
python字典开发三级菜单
查看>>
.net Framework下载地址
查看>>
十三个经典算法集锦
查看>>
深圳偶遇
查看>>
给自己电脑安装SSD与加内存条
查看>>
如何有效地记录 Java SQL 日志?
查看>>
学习Linux决心书
查看>>
stp 总结
查看>>
Java本地文件操作(五)遍历文件夹
查看>>
BGP学习笔记
查看>>
mysql字段加密
查看>>
linux 磁盘分区(一)
查看>>
在虚拟机中的域环境下批量安装部署软件(第三节)
查看>>