123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- <?php
- // +----------------------------------------------------------------------
- // | ThinkCMF [ WE CAN DO IT MORE SIMPLE ]
- // +----------------------------------------------------------------------
- // | Copyright (c) 2013-2018 http://www.thinkcmf.com All rights reserved.
- // +----------------------------------------------------------------------
- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
- // +---------------------------------------------------------------------
- // | Author: 老猫 <catmat@thinkcmf.com>
- // +----------------------------------------------------------------------
- /*
- * 数据库表
- CREATE TABLE `cmf_queue_jobs` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `queue` varchar(255) NOT NULL,
- `payload` longtext NOT NULL,
- `attempts` tinyint(3) unsigned NOT NULL,
- `reserved` tinyint(3) unsigned NOT NULL,
- `reserve_time` int(10) unsigned DEFAULT NULL,
- `available_time` int(10) unsigned NOT NULL,
- `create_time` int(10) unsigned NOT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- */
- namespace cmf\queue\connector;
- use think\queue\connector\Database as DataBaseConnector;
- class Database extends DataBaseConnector
- {
- protected $options = [
- 'expire' => 60,
- 'default' => 'default',
- 'table' => 'queue_jobs',
- 'dsn' => []
- ];
- /**
- * Push a raw payload to the database with a given delay.
- *
- * @param \DateTime|int $delay
- * @param string|null $queue
- * @param string $payload
- * @param int $attempts
- * @return mixed
- */
- protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
- {
- return $this->db->name($this->options['table'])->insert([
- 'queue' => $this->getQueue($queue),
- 'payload' => $payload,
- 'attempts' => $attempts,
- 'reserved' => 0,
- 'reserve_time' => null,
- 'available_time' => time() + $delay,
- 'create_time' => time()
- ]);
- }
- /**
- * 获取下个有效任务
- *
- * @param string|null $queue
- * @return \StdClass|null
- */
- protected function getNextAvailableJob($queue)
- {
- $this->db->startTrans();
- $job = $this->db->name($this->options['table'])
- ->lock(true)
- ->where('queue', $this->getQueue($queue))
- ->where('reserved', 0)
- ->where('available_time', '<=', time())
- ->order('id', 'asc')
- ->find();
- return $job ? (object)$job : null;
- }
- /**
- * 标记任务正在执行.
- *
- * @param string $id
- * @return void
- */
- protected function markJobAsReserved($id)
- {
- $this->db->name($this->options['table'])->where('id', $id)->update([
- 'reserved' => 1,
- 'reserve_time' => time()
- ]);
- }
- /**
- * 重新发布超时的任务
- *
- * @param string $queue
- * @return void
- */
- protected function releaseJobsThatHaveBeenReservedTooLong($queue)
- {
- $expired = time() - $this->options['expire'];
- $this->db->name($this->options['table'])
- ->where('queue', $this->getQueue($queue))
- ->where('reserved', 1)
- ->where('reserve_time', '<=', $expired)
- ->update([
- 'reserved' => 0,
- 'reserve_time' => null,
- 'attempts' => ['exp', 'attempts + 1']
- ]);
- }
- }
|