为了在Zookeeper中实现分布式队列,首先需要设计一个znode来存放数据,这个节点叫做队列节点,我们的例子中这个节点是/zookeeper/queue。 生产者向队列中存放数据,每一个消息都是队列节点下的一个新节点,叫做消息节点。消息节点的命名规则为:queue-xxx,xxx是一个单调 递增的序列,我们可以在创建节点时指定创建模式为PERSISTENT_SEQUENTIAL来实现。这样,生产者不断的向队列节点中发送消息,消息为queue-xxx, 队列中,生产者这一端就解决了,我们具体看一下代码:

Producer(生产者)

  1. public class Producer implements Runnable,Watcher {
  2.  
  3. private ZooKeeper zk;
  4.  
  5. public Producer(String address){
  6. try {
  7. this.zk = new ZooKeeper(address,3000,this);
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12.  
  13. @Override
  14. public void run() {
  15. int i = 0;
  16. //每隔10s向队列中放入数据
  17. while (true){
  18. try {
  19. zk.create("/zookeeper/queue/queue-",(Thread.currentThread().getName()+"-"+i).getBytes(),
  20. ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
  21. Thread.sleep(10000);
  22. i++;
  23. } catch (KeeperException e) {
  24. e.printStackTrace();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30.  
  31. @Override
  32. public void process(WatchedEvent event) {
  33. }
  34. }

生产者每隔10s向队列中存放消息,消息节点的类型为PERSISTENT_SEQUENTIAL,消息节点中的数据为Thread.currentThread().getName()+”-“+i。

消费者从队列节点中获取消息,我们使用getChildren()方法获取到队列节点中的所有消息,然后获取消息节点数据,消费消息,并删除消息节点。 如果getChildren()没有获取到数据,说明队列是空的,则消费者等待,然后再调用getChildren()方法设置观察者监听队列节点,队列节点发生变化后 (子节点改变),触发监听事件,唤起消费者。消费者实现如下:

  1. public class Consumer implements Runnable,Watcher {
  2. private ZooKeeper zk;
  3. private List<String> children;
  4.  
  5. public Consumer(String address){
  6. try {
  7. this.zk = new ZooKeeper(address,3000,this);
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12.  
  13. @Override
  14. public void run() {
  15. int i = 1;
  16. while (true){
  17. try {
  18. //获取所有子节点
  19. children = zk.getChildren("/zookeeper/queue", false);
  20. int size = CollectionUtils.isEmpty(children) ? 0 : children.size();
  21. System.out.println("第"+i+"次获取数据"+size+"条");
  22.  
  23. //队列中没有数据,设置观察器并等待
  24. if (CollectionUtils.isEmpty(children)){
  25. System.out.println("队列为空,消费者等待");
  26. zk.getChildren("/zookeeper/queue", true);
  27. synchronized (this){
  28. wait();
  29. }
  30. }else {
  31. //循环获取队列中消息,进行业务处理,并从结果集合中删除
  32. Iterator<String> iterator = children.iterator();
  33. while (iterator.hasNext()){
  34. String childNode = iterator.next();
  35. handleBusiness(childNode);
  36. iterator.remove();
  37. }
  38. }
  39. } catch (KeeperException e) {
  40. e.printStackTrace();
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. i++;
  45. }
  46. }
  47.  
  48. /**
  49. * 从节点获取数据,执行业务,并删除节点
  50. * @param childNode
  51. */
  52. private void handleBusiness(String childNode) {
  53. try {
  54. Stat stat = new Stat();
  55. byte[] data = zk.getData("/zookeeper/queue/"+childNode, false, stat);
  56. String str = new String(data);
  57. System.out.println("获取节点数据:"+str);
  58. zk.delete("/zookeeper/queue/"+childNode,-1);
  59. } catch (KeeperException e) {
  60. e.printStackTrace();
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64.  
  65.  
  66. }
  67.  
  68. /**
  69. * 子节点发生变化,且取得结果为空时,说明消费者等待,唤起消费者
  70. * @param event
  71. */
  72. @Override
  73. public void process(WatchedEvent event) {
  74. if (event.getType().equals(Event.EventType.NodeChildrenChanged)){
  75. synchronized (this){
  76. notify();
  77. }
  78. }
  79. }
  80. }

上面的例子中有一个局限性,就是 消费者只能有一个 。队列的用户有两个:广播和队列。

  • 广播是所有消费者都拿到消息并消费,我们的例子在删除消息节点时,不能保证其他消费者都拿到了这个消息。
  • 队列是一个消息只能被一个消费者消费,我们的例子中,消费者获取消息时,并没有加锁。

所以我们只启动一个消费者来演示,主函数如下:

  1. public class Application {
  2.  
  3. private static final String ADDRESS = "149.28.37.147:2181";
  4.  
  5. public static void main(String[] args) {
  6. //设置日志级别
  7. setLog();
  8.  
  9. //启动一个消费者
  10. new Thread(new Consumer(ADDRESS)).start();
  11.  
  12. //启动4个生产者
  13. ExecutorService es = Executors.newFixedThreadPool(4);
  14. for (int i=0;i<4;i++){
  15. es.execute(new Producer(ADDRESS));
  16. }
  17. es.shutdown();
  18.  
  19. }
  20.  
  21. /**
  22. * 设置log级别为Error
  23. */
  24. public static void setLog(){
  25. //1.logback
  26. LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
  27. //获取应用中的所有logger实例
  28. List<Logger> loggerList = loggerContext.getLoggerList();
  29.  
  30. //遍历更改每个logger实例的级别,可以通过http请求传递参数进行动态配置
  31. for (ch.qos.logback.classic.Logger logger:loggerList){
  32. logger.setLevel(Level.toLevel("ERROR"));
  33. }
  34. }
  35. }

后台打印结果如下:

  1. 1次获取数据2
  2. 获取节点数据:pool-1-thread-4-118
  3. 获取节点数据:pool-1-thread-1-0
  4. 2次获取数据3
  5. 获取节点数据:pool-1-thread-4-0
  6. 获取节点数据:pool-1-thread-2-0
  7. 获取节点数据:pool-1-thread-3-0
  8. 3次获取数据0
  9. 队列为空,消费者等待
  10. 4次获取数据4
  11. 获取节点数据:pool-1-thread-3-1
  12. 获取节点数据:pool-1-thread-1-1
  13. 获取节点数据:pool-1-thread-4-1
  14. 获取节点数据:pool-1-thread-2-1

Zookeeper实现队列就介绍完了,项目地址:https://github.com/liubo-tech/zookeeper-application

posted on 2018-07-03 16:33 牛初九 阅读() 评论() 编辑 收藏
版权声明:本文为boboooo原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/boboooo/p/9259306.html