这是关于 Swoole 入门学习的第八篇文章:Swoole MySQL 连接池的实现。

收到读者的咨询,这情况大家可能也会有,所以就在这说说:

“亮哥,我今年30岁了,有点中年危机,最近有点焦虑,发现工作虽然很忙,但是没感觉能力有所提升,整天都有写不完的业务代码,时间紧有时代码质量也不怎么样,知道还有很多改进空间,但一直没时间改,主要是后面项目压着,马上又要进入开发了,这种情况怎么办?”

首先,我是菜鸡,观点不喜勿喷,那我就说下自己的看法:

上面的描述比较主观,人呀有时候发现不了自己的能力很正常,有时候有能力了并不是马上就能显现的,而是到了某个阶段后突然发现,哇塞,原来自己这么厉害。

当然能力也分很多种,比如专业能力,快速学习能力,进度把控能力,还有自信也是一种能力,不要脸是一种能力,坚持不要脸更是一种能力。

其实能力提升最快的还是靠工作实践,悄悄问问自己加入了很多大牛的微信群,能力提升了吗?看书自学不实践是不是吸收的也不多。

如果非要给一个具体的方案,那就是在团队内多分享吧,因为在分享前你会做充分的准备来避免分享时出丑,即使有时候自己知道,当讲出来的时候就不是那么回事了。

前期分享可以是看稿,后期练习无稿分享。

然后,再多说一点,30了给自己一个目标,不要盲目每天就是学学学,比如目标是技术专家,目标是业务专家,都很好呀,当然目标与自己性格有关也不是一成不变的。

围绕着目标设置一些计划,不要以为每天的学学学,就觉得其他的一切就自然而来,其中还有很多机遇和人脉的因素。

最后,如果实在感觉压得喘不过气,就换个环境吧,别和自己过不去。

开始今天的文章,这篇文章实现了 Swoole MySQL 连接池,代码是在《Swoole RPC 的实现》文章的基础上进行开发的。

先回顾上篇文章的内容:

  • 实现了 HTTP / TCP 请求
  • 实现了 同步 / 异步 请求
  • 分享了 OnRequest.php、OnReceive.php 源码
  • 业务逻辑 Order.php 中返回的是假数据

本篇文章主要的功能点:

  • 业务逻辑 Order.php 中返回 MySQL 数据库中的数据。
  • Task 启用了协程
  • 支持 主/从 数据库配置
  • 实现数据库连接池
  • 实现数据库 CURD
  1. <?php
  2. if (!defined(\'SERVER_PATH\')) exit("No Access");
  3. class Order
  4. {
  5. public function get_list($uid = 0, $type = 0)
  6. {
  7. //TODO 业务代码
  8. $rs[0][\'order_code\'] = \'1\';
  9. $rs[0][\'order_name\'] = \'订单1\';
  10. $rs[1][\'order_code\'] = \'2\';
  11. $rs[1][\'order_name\'] = \'订单2\';
  12. $rs[2][\'order_code\'] = \'3\';
  13. $rs[2][\'order_name\'] = \'订单3\';
  14. return $rs;
  15. }
  16. }
  17. 修改成:
  18. class Order
  19. {
  20. private $mysql;
  21. private $table;
  22. public function __construct()
  23. {
  24. $pool = MysqlPool::getInstance();
  25. $this->mysql = $pool->get();
  26. $this->table = \'order\';
  27. }
  28. public function add($code = \'\', $name = \'\')
  29. {
  30. //TODO 验证
  31. return $this->mysql->insert($this->table, [\'code\' => $code, \'name\' => $name]);
  32. }
  33. public function edit($id = 0, $name=\'\')
  34. {
  35. //TODO 验证
  36. return $this->mysql->update($this->table, [\'name\' => $name], [\'id\' => $id]);
  37. }
  38. public function del($id = 0)
  39. {
  40. //TODO 验证
  41. return $this->mysql->delete($this->table, [\'id\' => $id]);
  42. }
  43. public function info($code = \'\')
  44. {
  45. //TODO 验证
  46. return $this->mysql->select($this->table, [\'code\' => $code]);
  47. }
  48. }

一、需要新增两项配置:

  1. enable_coroutine = true
  2. task_enable_coroutine = true

二、回调参数发生改变

  1. $serv->on(\'Task\', function ($serv, $task_id, $src_worker_id, $data) {
  2. ...
  3. });
  4. 修改成:
  5. $serv->on(\'Task\', function ($serv, $task) {
  6. $task->worker_id; //来自哪个`Worker`进程
  7. $task->id; //任务的编号
  8. $task->data; //任务的数据
  9. });

Mysql.php

  1. <?php
  2. if (!defined(\'SERVER_PATH\')) exit("No Access");
  3. $db[\'default\'][\'pool_size\'] = 3; //连接池个数
  4. $db[\'default\'][\'pool_get_timeout\'] = 0.5; //获取连接池超时时间
  5. $db[\'default\'][\'timeout\'] = 0.5; //数据库建立连接超时时间
  6. $db[\'default\'][\'charset\'] = \'utf8\'; //字符集
  7. $db[\'default\'][\'strict_type\'] = false; //开启严格模式
  8. $db[\'default\'][\'fetch_mode\'] = true; //开启fetch模式
  9. $config[\'master\'] = $db[\'default\'];
  10. $config[\'master\'][\'host\'] = \'127.0.0.1\';
  11. $config[\'master\'][\'port\'] = 3306;
  12. $config[\'master\'][\'user\'] = \'root\';
  13. $config[\'master\'][\'password\'] = \'123456\';
  14. $config[\'master\'][\'database\'] = \'demo\';
  15. $config[\'slave\'] = $db[\'default\'];
  16. $config[\'slave\'][\'host\'] = \'127.0.0.1\';
  17. $config[\'slave\'][\'port\'] = 3306;
  18. $config[\'slave\'][\'user\'] = \'root\';
  19. $config[\'slave\'][\'password\'] = \'123456\';
  20. $config[\'slave\'][\'database\'] = \'demo\';

MysqlPool.php

  1. <?php
  2. if (!defined(\'SERVER_PATH\')) exit("No Access");
  3. class MysqlPool
  4. {
  5. private static $instance;
  6. private $pool;
  7. private $config;
  8. public static function getInstance($config = null)
  9. {
  10. if (empty(self::$instance)) {
  11. if (empty($config)) {
  12. throw new RuntimeException("MySQL config empty");
  13. }
  14. self::$instance = new static($config);
  15. }
  16. return self::$instance;
  17. }
  18. public function __construct($config)
  19. {
  20. if (empty($this->pool)) {
  21. $this->config = $config;
  22. $this->pool = new chan($config[\'master\'][\'pool_size\']);
  23. for ($i = 0; $i < $config[\'master\'][\'pool_size\']; $i++) {
  24. go(function() use ($config) {
  25. $mysql = new MysqlDB();
  26. $res = $mysql->connect($config);
  27. if ($res === false) {
  28. throw new RuntimeException("Failed to connect mysql server");
  29. } else {
  30. $this->pool->push($mysql);
  31. }
  32. });
  33. }
  34. }
  35. }
  36. public function get()
  37. {
  38. if ($this->pool->length() > 0) {
  39. $mysql = $this->pool->pop($this->config[\'master\'][\'pool_get_timeout\']);
  40. if (false === $mysql) {
  41. throw new RuntimeException("Pop mysql timeout");
  42. }
  43. defer(function () use ($mysql) { //释放
  44. $this->pool->push($mysql);
  45. });
  46. return $mysql;
  47. } else {
  48. throw new RuntimeException("Pool length <= 0");
  49. }
  50. }
  51. }

MysqlDB.php

  1. <?php
  2. if (!defined(\'SERVER_PATH\')) exit("No Access");
  3. class MysqlDB
  4. {
  5. private $master;
  6. private $slave;
  7. private $config;
  8. public function __call($name, $arguments)
  9. {
  10. if ($name != \'query\') {
  11. throw new RuntimeException($name.":This command is not supported");
  12. } else {
  13. return $this->_execute($arguments[0]);
  14. }
  15. }
  16. public function connect($config)
  17. {
  18. //主库
  19. $master = new Swoole\Coroutine\MySQL();
  20. $res = $master->connect($config[\'master\']);
  21. if ($res === false) {
  22. throw new RuntimeException($master->connect_error, $master->errno);
  23. } else {
  24. $this->master = $master;
  25. }
  26. //从库
  27. $slave = new Swoole\Coroutine\MySQL();
  28. $res = $slave->connect($config[\'slave\']);
  29. if ($res === false) {
  30. throw new RuntimeException($slave->connect_error, $slave->errno);
  31. } else {
  32. $this->slave = $slave;
  33. }
  34. $this->config = $config;
  35. return $res;
  36. }
  37. public function insert($table = \'\', $data = [])
  38. {
  39. $fields = \'\';
  40. $values = \'\';
  41. $keys = array_keys($data);
  42. foreach ($keys as $k) {
  43. $fields .= "`".addslashes($k)."`, ";
  44. $values .= "\'".addslashes($data[$k])."\', ";
  45. }
  46. $fields = substr($fields, 0, -2);
  47. $values = substr($values, 0, -2);
  48. $sql = "INSERT INTO `{$table}` ({$fields}) VALUES ({$values})";
  49. return $this->_execute($sql);
  50. }
  51. public function update($table = \'\', $set = [], $where = [])
  52. {
  53. $arr_set = [];
  54. foreach ($set as $k => $v) {
  55. $arr_set[] = \'`\'.$k . \'` = \' . $this->_escape($v);
  56. }
  57. $set = implode(\', \', $arr_set);
  58. $where = $this->_where($where);
  59. $sql = "UPDATE `{$table}` SET {$set} {$where}";
  60. return $this->_execute($sql);
  61. }
  62. public function delete($table = \'\', $where = [])
  63. {
  64. $where = $this->_where($where);
  65. $sql = "DELETE FROM `{$table}` {$where}";
  66. return $this->_execute($sql);
  67. }
  68. public function select($table = \'\',$where = [])
  69. {
  70. $where = $this->_where($where);
  71. $sql = "SELECT * FROM `{$table}` {$where}";
  72. return $this->_execute($sql);
  73. }
  74. private function _where($where = [])
  75. {
  76. $str_where = \'\';
  77. foreach ($where as $k => $v) {
  78. $str_where .= " AND `{$k}` = ".$this->_escape($v);
  79. }
  80. return "WHERE 1 ".$str_where;
  81. }
  82. private function _escape($str)
  83. {
  84. if (is_string($str)) {
  85. $str = "\'".$str."\'";
  86. } elseif (is_bool($str)) {
  87. $str = ($str === FALSE) ? 0 : 1;
  88. } elseif (is_null($str)) {
  89. $str = \'NULL\';
  90. }
  91. return $str;
  92. }
  93. private function _execute($sql)
  94. {
  95. if (strtolower(substr($sql, 0, 6)) == \'select\') {
  96. $db = $this->_get_usable_db(\'slave\');
  97. } else {
  98. $db = $this->_get_usable_db(\'master\');
  99. }
  100. $result = $db->query($sql);
  101. if ($result === true) {
  102. return [
  103. \'affected_rows\' => $db->affected_rows,
  104. \'insert_id\' => $db->insert_id,
  105. ];
  106. }
  107. return $result;
  108. }
  109. private function _get_usable_db($type)
  110. {
  111. if ($type == \'master\') {
  112. if (!$this->master->connected) {
  113. $master = new Swoole\Coroutine\MySQL();
  114. $res = $master->connect($this->config[\'master\']);
  115. if ($res === false) {
  116. throw new RuntimeException($master->connect_error, $master->errno);
  117. } else {
  118. $this->master = $master;
  119. }
  120. }
  121. return $this->master;
  122. } elseif ($type == \'slave\') {
  123. if (!$this->slave->connected) {
  124. $slave = new Swoole\Coroutine\MySQL();
  125. $res = $slave->connect($this->config[\'slave\']);
  126. if ($res === false) {
  127. throw new RuntimeException($slave->connect_error, $slave->errno);
  128. } else {
  129. $this->slave = $slave;
  130. }
  131. }
  132. return $this->slave;
  133. }
  134. }
  135. }
  1. try {
  2. MysqlPool::getInstance(get_config(\'mysql\'));
  3. } catch (\Exception $e) {
  4. $serv->shutdown();
  5. } catch (\Throwable $throwable) {
  6. $serv->shutdown();
  7. }
  1. <?php
  2. //新增
  3. $demo = [
  4. \'type\' => \'SW\',
  5. \'token\' => \'Bb1R3YLipbkTp5p0\',
  6. \'param\' => [
  7. \'class\' => \'Order\',
  8. \'method\' => \'add\',
  9. \'param\' => [
  10. \'code\' => \'C\'.mt_rand(1000,9999),
  11. \'name\' => \'订单-\'.mt_rand(1000,9999),
  12. ],
  13. ],
  14. ];
  15. //编辑
  16. $demo = [
  17. \'type\' => \'SW\',
  18. \'token\' => \'Bb1R3YLipbkTp5p0\',
  19. \'param\' => [
  20. \'class\' => \'Order\',
  21. \'method\' => \'edit\',
  22. \'param\' => [
  23. \'id\' => \'4\',
  24. \'name\' => \'订单-\'.mt_rand(1000,9999),
  25. ],
  26. ],
  27. ];
  28. //删除
  29. $demo = [
  30. \'type\' => \'SW\',
  31. \'token\' => \'Bb1R3YLipbkTp5p0\',
  32. \'param\' => [
  33. \'class\' => \'Order\',
  34. \'method\' => \'del\',
  35. \'param\' => [
  36. \'id\' => \'1\',
  37. ],
  38. ],
  39. ];
  40. //查询
  41. $demo = [
  42. \'type\' => \'SW\',
  43. \'token\' => \'Bb1R3YLipbkTp5p0\',
  44. \'param\' => [
  45. \'class\' => \'Order\',
  46. \'method\' => \'info\',
  47. \'param\' => [
  48. \'code\' => \'C4649\'
  49. ],
  50. ],
  51. ];
  52. $ch = curl_init();
  53. $options = [
  54. CURLOPT_URL => \'http://10.211.55.4:9509/\',
  55. CURLOPT_POST => 1,
  56. CURLOPT_POSTFIELDS => json_encode($demo),
  57. ];
  58. curl_setopt_array($ch, $options);
  59. curl_exec($ch);
  60. curl_close($ch);

官方协程 MySQL 客户端手册:

https://wiki.swoole.com/wiki/page/p-coroutine_mysql.html

大家可以尝试使用官方提供的其他方法。

Demo 代码仅供参考,里面有很多不严谨的地方。

根据自己的需要进行修改,比如业务代码相关验证,CURD 方法封装 …

推荐一个完善的产品,Swoole 开发的 MySQL 数据库连接池(SMProxy):

https://github.com/louislivi/smproxy

上面的 Demo 需要源码的,加我微信。(菜单-> 加我微信-> 扫我)

本文欢迎转发,转发请注明作者和出处,谢谢!

版权声明:本文为xinliangcoder原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/xinliangcoder/p/10930035.html