`
agapple
  • 浏览: 1582860 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

zookeeper项目使用几点小结

 
阅读更多

背景

  前段时间学习了zookeeper后,在新的项目中刚好派上了用场,我在项目中主要负责分布式任务调度模块的开发,对我自己来说是个不小的挑战。

  分布式的任务调度,技术上我们选择了zookeeper,具体的整个分布式任务调度的架构选择会另起一篇文章进行介绍。

 

  本文主要是介绍自己在项目中zookeeper的一些扩展使用,希望可以对大家有所帮助。

  项目中使用的zookeeper版本3.3.3,对应的文档地址: http://zookeeper.apache.org/doc/trunk/

扩展一:优先集群

先来点背景知识:

1.zookeeper中的server机器之间会组成leader/follower集群,1:n的关系。采用了paxos一致性算法保证了数据的一致性,就是leader/follower会采用通讯的方式进行投票来实现paxns。

2.zookeeper还支持一种observer模式,提供只读服务不参与投票,提升系统,对应文档: http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html

 

我们项目特性的决定了我们需要进行跨机房操作,比如杭州,美国,香港,青岛等多个机房之间进行数据交互。

跨机房之间对应的网络延迟都比较大,比如中美机房走海底光缆有ping操作200ms的延迟,杭州和青岛机房有70ms的延迟。 

 

为了提升系统的网络性能,我们在部署zookeeper网络时会在每个机房部署节点,多个机房之间再组成一个大的网络保证数据一致性。(zookeeper千万别再搞多个集群)

 

最后的部署结构就会是:

  • 杭州机房  >=3台 (构建leader/follower的zk集群)
  • 青岛机房  >=1台 (构建observer的zk集群)
  • 美国机房  >=1台 (构建observer的zk集群)
  • 香港机房  >=1台 (构建observer的zk集群)


 
一句话概括就是: 在单个机房内组成一个投票集群,外围的机房都会是一个observer集群和投票集群进行数据交互。 这样部署的一些好处,大家可以细细体会一下

针对这样的部署结构,我们会引入一个优先集群问题: 比如在美国机房的机器需要优先去访问本机房的zk集群,访问不到后才去访问杭州机房。 
默认在zookeeper3.3.3的实现中,认为所有的节点都是对等的。并没有对应的优先集群的概念,单个机器也没有对应的优先级的概念。

扩展代码:(比较暴力,采用反射的方式改变了zk client的集群列表)
  • 先使用美国机房的集群ip初始化一次zk client
  • 通过反射方式,强制在初始化后的zk client中的server列表中又加入杭州机房的机器列表
ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() {

                public void asyncProcess(WatchedEvent event) {
                    //do nothing 
                }

            });
            if (serveraddrs.size() > 1) {
                // 强制的声明accessible
                ReflectionUtils.makeAccessible(clientCnxnField);
                ReflectionUtils.makeAccessible(serverAddrsField);
                // 添加第二组集群列表
                for (int i = 1; i < serveraddrs.size(); i++) {
                    String cluster = serveraddrs.get(i);
                    // 强制获取zk中的地址信息
                    ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
                    List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils
                            .getField(serverAddrsField, cnxn);
                    // 添加第二组集群列表
                    serverAddrs.addAll(buildServerAddrs(cluster));
                }
            }
        }

扩展二:异步Watcher处理

  最早在看zookeeper的代码时,一直对它的watcher处理比较满意,使用watcher推送数据可以很方便的实现分布式锁的功能。

zookeeper的watcher实现原理也挺简单的,就是在zookeeper client和zookeeper server上都保存一份对应的watcher对象。每个zookeeper机器都会有一份完整的node tree数据和watcher数据,每次leader通知follower/observer数据发生变更后,每个zookeeper server会根据自己节点中的watcher事件推送给响应的zookeeper client,每个zk client收到后再根据内存中的watcher引用,进行回调。

 

这里会有个问题,就是zk client在处理watcher时,回凋的过程是一个串行的执行过程,所以单个watcher的处理慢会影响整个列表的响应。 

可以看一下ClientCnxn类中的EventThread处理,该线程会定时消费一个queue的数据,挨个调用processEvent(Object event) 进行回调处理。

 

扩展代码:

 

public abstract class AsyncWatcher implements Watcher {

    private static final int       DEFAULT_POOL_SIZE    = 30;
    private static final int       DEFAULT_ACCEPT_COUNT = 60;

    private static ExecutorService executor             = new ThreadPoolExecutor(
                                                                1,
                                                                DEFAULT_POOL_SIZE,
                                                                0L,
                                                                TimeUnit.MILLISECONDS,
                                                                new ArrayBlockingQueue(
                                                                        DEFAULT_ACCEPT_COUNT),
                                                                new NamedThreadFactory(
                                                                        "Arbitrate-Async-Watcher"),
                                                                new ThreadPoolExecutor.CallerRunsPolicy());

    public void process(final WatchedEvent event) {
        executor.execute(new Runnable() {//提交异步处理

                    @Override
                    public void run() {
                        asyncProcess(event);
                    }
                });

    }

    public abstract void asyncProcess(WatchedEvent event);

}

 

说明:
  • zookeeper针对watcher的调用是以单线程串行的方式进行处理,容易造成堵塞影响,monitor的数据同步及时性
  • AsyncWatcher为采取的一种策略为当不超过acceptCount=60的任务时,会采用异步线程的方式处理。如果超过60任务,会变为原先的单线程串行的模式

扩展三:重试处理

这个也不多说啥,看一下相关文档就清楚了

 

需要特殊处理下ConnectionLoss的异常,一种可恢复的异常。

重试处理:
public interface ZooKeeperOperation<T> {

    public T execute() throws KeeperException, InterruptedException;
}


/**
     * 包装重试策略
     */
    public <T> T retryOperation(ZooKeeperOperation<T> operation) throws KeeperException,
            InterruptedException {
        KeeperException exception = null;
        for (int i = 0; i < maxRetry; i++) {
            try {
                return (T) operation.execute();
            } catch (KeeperException.SessionExpiredException e) {
                logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e);
                throw e;
            } catch (KeeperException.ConnectionLossException e) { //特殊处理Connection Loss
                if (exception == null) {
                    exception = e;
                }
                logger.warn("Attempt " + i + " failed with connection loss so "
                        + "attempting to reconnect: " + e, e);

                retryDelay(i);
            }
        }

        throw exception;
    }

注意点:Watcher原子性

在使用zookeeper的过程中,需要特别注意一点就是注册对应watcher事件时,如果当前的节点已经满足了条件,比如exist的watcher,它不会触发你的watcher,而会等待下一次watcher条件的满足。

它的watcher是一个一次性的监听,而不是一个永久的订阅过程。所以在watcher响应和再次注册watcher过程并不是一个原子操作,编写多线程代码和锁时需要特别注意

总结

  zookeepr是一个挺不错的产品,源代码写的也非常不错,大量使用了queue和异步Thread的处理模式,真是一个伟大的产品。

  • 大小: 207 KB
分享到:
评论
19 楼 sukeqiang821021 2016-03-17  
dxqrr 写道
想问下LZ

杭州机房  >=3台 (构建leader/follower的zk集群)
青岛机房  >=1台 (构建observer的zk集群)
美国机房  >=1台 (构建observer的zk集群)
香港机房  >=1台 (构建observer的zk集群)

这个是每个地方构成一个小集群,然后整个构成一个大集群么
如果是那要怎么配置呢

抽象出来一层,青岛机房的集群看成一个节点,这就说明青岛机房对外只有一个IP,这个IP是用负载均衡来实现的。。。不知道我理解的对不对
18 楼 txm119161336 2016-03-09  
如果按这个方案,明显,写的时候还是有性能的问题,楼主对吗,因为所有的写都是会发到主机上面,也就是美国的写请求,也会在杭州的主上执行,然后再同步给观察者
17 楼 dxqrr 2015-10-26  
想问下LZ

杭州机房  >=3台 (构建leader/follower的zk集群)
青岛机房  >=1台 (构建observer的zk集群)
美国机房  >=1台 (构建observer的zk集群)
香港机房  >=1台 (构建observer的zk集群)

这个是每个地方构成一个小集群,然后整个构成一个大集群么
如果是那要怎么配置呢
16 楼 邢邢色色 2014-06-30  
关于扩展1,是否可以用分组和权限来取代呢?
15 楼 邢邢色色 2014-06-19  
我想再问下,你们国内外的网络带宽是Otter系统专用的吗?
如果共用的话,网络繁忙的时候,就怕zk国内外之间的通讯会卡住。
14 楼 agapple 2014-06-19  
邢邢色色 写道
请问跨机房部署的话您是使用公网IP来配置zk的吗?
具体的IP配置想请教下您的做法。


目前我们用了vpn,所以都是走内网ip
13 楼 邢邢色色 2014-06-18  
请问跨机房部署的话您是使用公网IP来配置zk的吗?
具体的IP配置想请教下您的做法。
12 楼 agapple 2014-04-24  
keyboard2000 写道
agapple你好,我是最近才看的zk,不知你有没了解过zk里的DistributedQueue实现,我测试下生产者offer,在普通双核pc机上10分钟才能生产1万个,磁盘灯一直是亮的,会有这么慢的性能吗?


差不多了,zk的写入tps也就只有1~2w而已
11 楼 keyboard2000 2014-04-23  
agapple你好,我是最近才看的zk,不知你有没了解过zk里的DistributedQueue实现,我测试下生产者offer,在普通双核pc机上10分钟才能生产1万个,磁盘灯一直是亮的,会有这么慢的性能吗?
10 楼 agapple 2014-04-01  
xloogson 写道
你好,我发现hbase里面的重试处理,在发生ZSESSIONEXPIRED过期的时候,也会做重试处理,但是我看FAQ,说ZSESSIONEXPIRED应该是一个fetal错误,不能重试。

那么到底哪个是对的啊?


重试没错,只不过需要业务实现session expired重试后的一些处理,比如重建临时节点
9 楼 xloogson 2014-04-01  
你好,我发现hbase里面的重试处理,在发生ZSESSIONEXPIRED过期的时候,也会做重试处理,但是我看FAQ,说ZSESSIONEXPIRED应该是一个fetal错误,不能重试。

那么到底哪个是对的啊?
8 楼 agapple 2014-01-09  
willfcareer0 写道
按照文章中的跨机房部署方案,如何做到杭州机房机器全部宕机(比如停电)后服务的可用性?


做不到,一般在杭州会部署3个机房的zookeeper,挂了其中一个机房,还可以继续工作.
7 楼 willfcareer0 2014-01-08  
按照文章中的跨机房部署方案,如何做到杭州机房机器全部宕机(比如停电)后服务的可用性?
6 楼 agapple 2013-11-14  
qianshangding 写道
ReflectionUtils.makeAccessible(clientCnxnField);
ReflectionUtils.makeAccessible(serverAddrsField);

请问clientCnxnField和serverAddrsField这两个Field是哪里来的?
如果把杭州的server列表加入到zk对象,因为会服务器列表是shuffle的,那么下次美国客户端就有可能访问到杭州机房的zookeeper服务器呢?我块还不太了解,麻烦指点下,谢谢


clientCnxnField/serverAddrsField这个是zookeeper早期版本的代码
5 楼 qianshangding 2013-11-13  
ReflectionUtils.makeAccessible(clientCnxnField);
ReflectionUtils.makeAccessible(serverAddrsField);

请问clientCnxnField和serverAddrsField这两个Field是哪里来的?
如果把杭州的server列表加入到zk对象,因为服务器列表是会shuffle的,那么下次美国客户端就有可能访问到杭州机房的zookeeper服务器呢?我块还不太了解,麻烦指点下,谢谢
4 楼 agapple 2013-10-31  
oyxccyj 写道
你好,刚接触zookeeper,扩展三在实际项目中怎么搞?我目前是要处理Hadoop集群上的服务器全部挂掉了,这时候zookeeper默认的是不断的去创建链接所以很慢(50分钟),我想通过笔者的这个扩展,比如我在调用一个baseDao的时候他首先是获取链接,获取这个链接的方法正如我上面所说不停的去链接,我想获取链接的时候走扩展三的方法。这样我就可以控制链接次数和时间短时间内给出异常日志信息,希望大神指导,谢谢


估计是hbase使用的zookeeper做了扩展,才会无限制做重试了把
3 楼 oyxccyj 2013-10-30  
你好,刚接触zookeeper,扩展三在实际项目中怎么搞?我目前是要处理Hadoop集群上的服务器全部挂掉了,这时候zookeeper默认的是不断的去创建链接所以很慢(50分钟),我想通过笔者的这个扩展,比如我在调用一个baseDao的时候他首先是获取链接,获取这个链接的方法正如我上面所说不停的去链接,我想获取链接的时候走扩展三的方法。这样我就可以控制链接次数和时间短时间内给出异常日志信息,希望大神指导,谢谢
2 楼 agapple 2011-10-16  
AliKevin2011 写道
agapple,你好,感谢你分享zookeeper。
    1.你说的 paxns 应该是paxos吧?(不知道是不是我理解错误,当然这是吹毛求疵,我只是希望你的文字更精确,别介意哦。)
    2.还有我个人了解zookeeper中确切的说应该使用的是zab(zookeeper automic broadcast)当然是paxos优化版本。比如消除了“羊群效应”,正如你前篇文字提到的当节点数据变化时候,不会按照paxos算法将消息发送到所有的client,而是只发送到序号为下一个序号的client,从而缓解服务压力。
   我也是刚刚了解zookeeper,我理解不对之处多多指出。还是感谢你的分享。


恩,多谢你的建议。

1. 的确是paxos,是我的笔误,多谢指出。
2. 其实Watcher并不是paxos算法中的一部分。paxos中只包括提出决议和决议通过,这些都是发生在zookeeper内部过程。而Watcher只是在paxos算法完成后,是zookeeper提供的一些便利性工具,正因为这样的callback才允许我们实现了分布式锁机制提供了可能性

如果你看过上一篇分布式lock文章就知道,这里并没有一种百分百可靠&有效的lock方法,选择EPHEMERAL和PERSISTENT同样存在一些问题。基于EPHEMERAL的实现,通过heartbeat并不能100%确保(比如网络断了,jvm依然存在)
1 楼 AliKevin2011 2011-10-15  
agapple,你好,感谢你分享zookeeper。
    1.你说的 paxns 应该是paxos吧?(不知道是不是我理解错误,当然这是吹毛求疵,我只是希望你的文字更精确,别介意哦。)
    2.还有我个人了解zookeeper中确切的说应该使用的是zab(zookeeper automic broadcast)当然是paxos优化版本。比如消除了“羊群效应”,正如你前篇文字提到的当节点数据变化时候,不会按照paxos算法将消息发送到所有的client,而是只发送到序号为下一个序号的client,从而缓解服务压力。
   我也是刚刚了解zookeeper,我理解不对之处多多指出。还是感谢你的分享。

相关推荐

    个人搭建遇到zookeeper启动jps没有进程原因

    搭建问题遇到的总结几点,便于找查问题 1)检查配置文件是否有问题 坑点一: zookeeper 配置文件 dataLogDir=/kafka/zookeeperlogs (记得不要写错单词) 坑点二: 在配置的dataDir 下创建myid文件,内容就是对应...

    Hadoop实战中文版.PDF

    23612.4 搭建面向企业查询的分析系统——IBM的ES2项目 23812.4.1 ES2系统结构 24012.4.2 ES2爬虫 24112.4.3 ES2分析 24212.4.4 小结 24912.4.5 参考文献 250附录A HDFS文件命令 251构建hadoop运算坚实...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    【Redis】项目中用到redis,为什么选用redis 133 独特的键值对模型 134 内存储存,速度极快 135 丰富的附加功能 136 完善的文档 137 良好的支持 137 广泛的使用 138 【Nginx】Nginx如何配置防止DDOS攻击? 139 限制...

    2021互联网大厂Java架构师面试题突击视频教程

    03_关于互联网Java工程师面试突击训练课程的几点说明 04_体验一下面试官对于消息队列的7个连环炮 05_知其然而知其所以然:如何进行消息队列的技术选型? 06_引入消息队列之后该如何保证其高可用性? 07_我的天!我为...

    毕业设计电商网站源码-Hello-World:我在github中的第一个存储库

    推荐几个比较实用的阿里云服务,按需选择:1. 、2. 、3. 、4. (企业官网、电商网站,多种可供选择模板,代金券免费领取) Gitchat 推荐: Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java ...

    毕业设计电商网站源码-Snailclimb:蜗牛爬

    推荐几个比较实用的服务:1. (企业官网、电商网站,多种可供选择模板,代金券免费领取)、2. 、3. Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java 集合框架 Java 多线程 Java IO 与 NIO ...

    毕业设计电商网站源码-JavaGuide:学习资料

    推荐几个比较实用的服务:1. (企业官网、电商网站,多种可供选择模板,代金券免费领取)、2. 、3. Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java 集合框架 Java 多线程 Java IO 与 NIO ...

    毕业设计电商网站源码-JavaPower:JavaPower

    推荐几个比较实用的服务:1. (企业官网、电商网站,多种可供选择模板,代金券免费领取)、2. 、3. Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java 集合框架 Java 多线程 Java IO 与 NIO ...

    毕业设计电商网站源码-JavaGuide:指南

    推荐几个比较实用的服务:1. (企业官网、电商网站,多种可供选择模板,代金券免费领取)、2. 、3. Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java 集合框架 Java 多线程 Java IO 与 NIO ...

    毕业设计电商网站源码-javaGuide:指南

    推荐几个比较实用的服务:1. (企业官网、电商网站,多种可供选择模板,代金券免费领取)、2. 、3. Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java 集合框架 Java 多线程 Java IO 与 NIO ...

    毕业设计电商网站源码-JAVA-:JAVA-

    推荐几个比较实用的阿里云服务,按需选择:1. 、2. 、3. 、4. (企业官网、电商网站,多种可供选择模板,代金券免费领取) Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java 集合框架 Java ...

    毕业设计电商网站源码-JavaGuide:行家

    推荐几个比较实用的阿里云服务,按需选择:1. 、2. 、3. 、4. (企业官网、电商网站,多种可供选择模板,代金券免费领取) Ⅰ Ⅱ Ⅲ Ⅳ Ⅴ Ⅵ Ⅶ Ⅷ Ⅸ Ⅹ :hot_beverage: Java Java/J2EE 基础 Java 集合框架 Java ...

    分布式高性能日志复制服务 DistributedLog.zip

    因此,它整体的软件栈如下所示:具体来讲,它包含如下几个组成部分:LogLog是有序的、不可变的日志记录(log record),它的数据结构如下所示:日志记录每条日志记录都是一个字节序列。日志记录会按照序列写入到日志...

Global site tag (gtag.js) - Google Analytics