Database.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | ThinkCMF [ WE CAN DO IT MORE SIMPLE ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2013-2018 http://www.thinkcmf.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  8. // +---------------------------------------------------------------------
  9. // | Author: 老猫 <catmat@thinkcmf.com>
  10. // +----------------------------------------------------------------------
  11. /*
  12. * 数据库表
  13. CREATE TABLE `cmf_queue_jobs` (
  14. `id` int(11) NOT NULL AUTO_INCREMENT,
  15. `queue` varchar(255) NOT NULL,
  16. `payload` longtext NOT NULL,
  17. `attempts` tinyint(3) unsigned NOT NULL,
  18. `reserved` tinyint(3) unsigned NOT NULL,
  19. `reserve_time` int(10) unsigned DEFAULT NULL,
  20. `available_time` int(10) unsigned NOT NULL,
  21. `create_time` int(10) unsigned NOT NULL,
  22. PRIMARY KEY (`id`)
  23. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  24. */
  25. namespace cmf\queue\connector;
  26. use think\queue\connector\Database as DataBaseConnector;
  27. class Database extends DataBaseConnector
  28. {
  29. protected $options = [
  30. 'expire' => 60,
  31. 'default' => 'default',
  32. 'table' => 'queue_jobs',
  33. 'dsn' => []
  34. ];
  35. /**
  36. * Push a raw payload to the database with a given delay.
  37. *
  38. * @param \DateTime|int $delay
  39. * @param string|null $queue
  40. * @param string $payload
  41. * @param int $attempts
  42. * @return mixed
  43. */
  44. protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
  45. {
  46. return $this->db->name($this->options['table'])->insert([
  47. 'queue' => $this->getQueue($queue),
  48. 'payload' => $payload,
  49. 'attempts' => $attempts,
  50. 'reserved' => 0,
  51. 'reserve_time' => null,
  52. 'available_time' => time() + $delay,
  53. 'create_time' => time()
  54. ]);
  55. }
  56. /**
  57. * 获取下个有效任务
  58. *
  59. * @param string|null $queue
  60. * @return \StdClass|null
  61. */
  62. protected function getNextAvailableJob($queue)
  63. {
  64. $this->db->startTrans();
  65. $job = $this->db->name($this->options['table'])
  66. ->lock(true)
  67. ->where('queue', $this->getQueue($queue))
  68. ->where('reserved', 0)
  69. ->where('available_time', '<=', time())
  70. ->order('id', 'asc')
  71. ->find();
  72. return $job ? (object)$job : null;
  73. }
  74. /**
  75. * 标记任务正在执行.
  76. *
  77. * @param string $id
  78. * @return void
  79. */
  80. protected function markJobAsReserved($id)
  81. {
  82. $this->db->name($this->options['table'])->where('id', $id)->update([
  83. 'reserved' => 1,
  84. 'reserve_time' => time()
  85. ]);
  86. }
  87. /**
  88. * 重新发布超时的任务
  89. *
  90. * @param string $queue
  91. * @return void
  92. */
  93. protected function releaseJobsThatHaveBeenReservedTooLong($queue)
  94. {
  95. $expired = time() - $this->options['expire'];
  96. $this->db->name($this->options['table'])
  97. ->where('queue', $this->getQueue($queue))
  98. ->where('reserved', 1)
  99. ->where('reserve_time', '<=', $expired)
  100. ->update([
  101. 'reserved' => 0,
  102. 'reserve_time' => null,
  103. 'attempts' => ['exp', 'attempts + 1']
  104. ]);
  105. }
  106. }