博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper项目使用几点小结
阅读量:6825 次
发布时间:2019-06-26

本文共 6083 字,大约阅读时间需要 20 分钟。

背景

  前段时间学习了zookeeper后,在新的项目中刚好派上了用场,我在项目中主要负责分布式任务调度模块的开发,对我自己来说是个不小的挑战。

  分布式的任务调度,技术上我们选择了zookeeper,具体的整个分布式任务调度的架构选择会另起一篇文章进行介绍。

 

  本文主要是介绍自己在项目中zookeeper的一些扩展使用,希望可以对大家有所帮助。

  项目中使用的zookeeper版本3.3.3,对应的文档地址: 

 

扩展一:优先集群

先来点背景知识:

1.zookeeper中的server机器之间会组成leader/follower集群,1:n的关系。采用了paxos一致性算法保证了数据的一致性,就是leader/follower会采用通讯的方式进行投票来实现paxns。

2.zookeeper还支持一种observer模式,提供只读服务不参与投票,提升系统,对应文档: 

 

我们项目特性的决定了我们需要进行跨机房操作,比如杭州,美国,香港,青岛等多个机房之间进行数据交互。

跨机房之间对应的网络延迟都比较大,比如中美机房走海底光缆有ping操作200ms的延迟,杭州和青岛机房有70ms的延迟。 

 

为了提升系统的网络性能,我们在部署zookeeper网络时会在每个机房部署节点,多个机房之间再组成一个大的网络保证数据一致性。(zookeeper千万别再搞多个集群)

 

最后的部署结构就会是:

  • 杭州机房  >=3台 (构建leader/follower的zk集群)
  • 青岛机房  >=1台 (构建observer的zk集群)
  • 美国机房  >=1台 (构建observer的zk集群)
  • 香港机房  >=1台 (构建observer的zk集群)
 
一句话概括就是: 在单个机房内组成一个投票集群,外围的机房都会是一个observer集群和投票集群进行数据交互。 这样部署的一些好处,大家可以细细体会一下
 
针对这样的部署结构,我们会引入一个优先集群问题: 比如在美国机房的机器需要优先去访问本机房的zk集群,访问不到后才去访问杭州机房。 
默认在zookeeper3.3.3的实现中,认为所有的节点都是对等的。并没有对应的优先集群的概念,单个机器也没有对应的优先级的概念。
 
扩展代码:(比较暴力,采用反射的方式改变了zk client的集群列表)
  • 先使用美国机房的集群ip初始化一次zk client
  • 通过反射方式,强制在初始化后的zk client中的server列表中又加入杭州机房的机器列表

 

1.ZooKeeper zk = null;  2.        try {
3. zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() {
4. 5. public void asyncProcess(WatchedEvent event) {
6. //do nothing 7. } 8. 9. }); 10. if (serveraddrs.size() > 1) {
11. // 强制的声明accessible 12. ReflectionUtils.makeAccessible(clientCnxnField); 13. ReflectionUtils.makeAccessible(serverAddrsField); 14. // 添加第二组集群列表 15. for (int i = 1; i < serveraddrs.size(); i++) {
16. String cluster = serveraddrs.get(i); 17. // 强制获取zk中的地址信息 18. ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk); 19. List
serverAddrs = (List
) ReflectionUtils 20. .getField(serverAddrsField, cnxn); 21. // 添加第二组集群列表 22. serverAddrs.addAll(buildServerAddrs(cluster)); 23. } 24. } 25. }

 

扩展二:异步Watcher处理

  最早在看zookeeper的代码时,一直对它的watcher处理比较满意,使用watcher推送数据可以很方便的实现分布式锁的功能。

zookeeper的watcher实现原理也挺简单的,就是在zookeeper client和zookeeper server上都保存一份对应的watcher对象。每个zookeeper机器都会有一份完整的node tree数据和watcher数据,每次leader通知follower/observer数据发生变更后,每个zookeeper server会根据自己节点中的watcher事件推送给响应的zookeeper client,每个zk client收到后再根据内存中的watcher引用,进行回调。

 

这里会有个问题,就是zk client在处理watcher时,回凋的过程是一个串行的执行过程,所以单个watcher的处理慢会影响整个列表的响应。 

可以看一下ClientCnxn类中的EventThread处理,该线程会定时消费一个queue的数据,挨个调用processEvent(Object event) 进行回调处理。

 

扩展代码:

 

1.public abstract class AsyncWatcher implements Watcher {
2. 3. private static final int DEFAULT_POOL_SIZE = 30; 4. private static final int DEFAULT_ACCEPT_COUNT = 60; 5. 6. private static ExecutorService executor = new ThreadPoolExecutor( 7. 1, 8. DEFAULT_POOL_SIZE, 9. 0L, 10. TimeUnit.MILLISECONDS, 11. new ArrayBlockingQueue( 12. DEFAULT_ACCEPT_COUNT), 13. new NamedThreadFactory( 14. "Arbitrate-Async-Watcher"), 15. new ThreadPoolExecutor.CallerRunsPolicy()); 16. 17. public void process(final WatchedEvent event) {
18. executor.execute(new Runnable() {
//提交异步处理 19. 20. @Override 21. public void run() {
22. asyncProcess(event); 23. } 24. }); 25. 26. } 27. 28. public abstract void asyncProcess(WatchedEvent event); 29. 30.}

 

说明:
  • zookeeper针对watcher的调用是以单线程串行的方式进行处理,容易造成堵塞影响,monitor的数据同步及时性
  • AsyncWatcher为采取的一种策略为当不超过acceptCount=60的任务时,会采用异步线程的方式处理。如果超过60任务,会变为原先的单线程串行的模式

扩展三:重试处理

这个也不多说啥,看一下相关文档就清楚了

 

需要特殊处理下ConnectionLoss的异常,一种可恢复的异常。
 
重试处理:
1.public interface ZooKeeperOperation
{
2. 3. public T execute() throws KeeperException, InterruptedException; 4.} 5. 6. 7./** 8. * 包装重试策略 9. */ 10. public
T retryOperation(ZooKeeperOperation
operation) throws KeeperException, 11. InterruptedException {
12. KeeperException exception = null; 13. for (int i = 0; i < maxRetry; i++) {
14. try {
15. return (T) operation.execute(); 16. } catch (KeeperException.SessionExpiredException e) {
17. logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e); 18. throw e; 19. } catch (KeeperException.ConnectionLossException e) {
//特殊处理Connection Loss 20. if (exception == null) {
21. exception = e; 22. } 23. logger.warn("Attempt " + i + " failed with connection loss so " 24. + "attempting to reconnect: " + e, e); 25. 26. retryDelay(i); 27. } 28. } 29. 30. throw exception; 31. }

注意点:Watcher原子性

在使用zookeeper的过程中,需要特别注意一点就是注册对应watcher事件时,如果当前的节点已经满足了条件,比如exist的watcher,它不会触发你的watcher,而会等待下一次watcher条件的满足。

它的watcher是一个一次性的监听,而不是一个永久的订阅过程。所以在watcher响应和再次注册watcher过程并不是一个原子操作,编写多线程代码和锁时需要特别注意

总结

  zookeepr是一个挺不错的产品,源代码写的也非常不错,大量使用了queue和异步Thread的处理模式,真是一个伟大的产品。

 

 

站内文章,如需转载,请保留作者和出处(云栖社区),并邮件通知云栖社区(yqeditor@list.alibaba-inc.com)。

转载于:https://www.cnblogs.com/starhu/p/5403117.html

你可能感兴趣的文章
【C#】在父窗体菜单合并子窗体菜单
查看>>
反射(6)程序集加载上下文
查看>>
oracle触发器after update of |更改之后赋值|
查看>>
Oracle命令:授权-收回权限-角色-用户状态
查看>>
常用的Ubuntu APT命令参数
查看>>
成功加盟者的8个特点
查看>>
Java基础03 构造器与方法重载
查看>>
如何让你的服务屏蔽Shodan扫描
查看>>
SpringBoot+Elasticsearch
查看>>
Vim 操作符命令和动作命令
查看>>
动态代理
查看>>
C语言 格式化输出--%m.n
查看>>
gradle配置国内的镜像
查看>>
Gitlab安装与备份恢复
查看>>
Elasticsearch-sql 用SQL查询Elasticsearch
查看>>
(原創) 如何讓Nios II自動抓到自己寫的IP的HAL? (SOC) (Nios II) (SOPC Builder) (DE2-70)
查看>>
JFS技术详细介绍
查看>>
Linux VI command
查看>>
创建可重用的对象
查看>>
jquery easyui treegrid使用小结:二
查看>>