基于 Redis 实现延迟队列

2020-01-18
4分钟阅读时长

前言

之前在一些博客上看到过讲如何实现延迟队列,但是平时没用上也没有动手实现过。

在上次面试的时候,面试官也问过我有没有用过延迟队列,最后凭借着记忆讲了下如何用 Redis 的有序集合实现延迟队列,以及有什么缺点。

纸上得来终觉浅,绝知此事要躬行。

这句诗就是本文的主要目的。

原理

主要用到了 Redis 的三个命令,ZADDZREMZRANGEBYSCORE

ZADD key_name score value

ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

ZREM key value [value ...]

ZADD 用于向有序集合中添加一条或多条数据,score 作为数据处理时间,value 存放数据。

ZRANGEBYSCORE 用于获取有序集合中指定时间范围内的数据,按照时间戳升序排列。

ZREM 用于将一条或多条数据从有序集合中移除。

举个例子:

# 添加测试数据
127.0.0.1:6379> ZADD test 1 a 2 b 3 c 4 d 5 e
(integer) 5

# 获取 2 到 4 之间的数据
127.0.0.1:6379> ZRANGEBYSCORE test 2 4
1) "b"
2) "c"
3) "d"

# 获取一条 2 到 4 之间的数据
127.0.0.1:6379> ZRANGEBYSCORE test 2 4 LIMIT 0 1
1) "b"

# 移除数据 b
127.0.0.1:6379> ZREM test b
(integer) 1

127.0.0.1:6379> ZRANGEBYSCORE test 2 4 LIMIT 0 1
1) "c"

在 Redis 2.4 版本前,ZADD、ZREM 每次只能添加/删除一条数据。

实现

用的是 predis/predis 扩展包操作 Redis。

use Predis\Client;

class RedisDelayQueue
{
    /**
     * @var Client
     */
    private $client = null;

    /**
     * RedisDelayQueue constructor.
     * @param Client $client
     */
    public function __construct(Client $client = null)
    {
        $this->client = $client ?: $this->createClient();
    }

    private function createClient()
    {
        $params = [
            'scheme' => 'tcp',
            'host'   => '127.0.0.1',
            'port'   => 6379,
        ];

        return new Client($params);
    }

    /**
     * 添加数据
     * 
     * @param $queueName
     * @param $data
     * @return int
     */
    public function push($queueName, $data)
    {
        $members = [];
        $data = isset($data['content']) ? [$data] : $data;
        
        foreach($data as $datum) {
            $members[$datum['content']] = $datum['time'];
        }

        return $this->client->zadd($queueName, $members);
    }

    /**
     * 消费数据
     * @param $queueName
     * @param \Closure $callback
     * @param \Closure $catch
     */
    public function consume($queueName, \Closure $callback, \Closure $catch = null)
    {
        $options = [
            'limit' => [
                'offset' => 0,
                'count' => 1,   // 只取一条数据
            ]
        ];

        while (true) {
            // 从集合中获取一条小于当前时间的数据
            $result = $this->client->zrangebyscore($queueName, 0, time(), $options);
            // 没获取到数据就休息一秒
            if (empty($result)) {
                sleep(1);
                continue;
            }

            // 将数据从集合中移除
            if (!$this->client->zrem($queueName, $result[0])) {
                continue;
            }

            try {
                $callback($result[0]);
            } catch(\Exception $e) {
                if (!$catch instanceof \Closure) {
                    echo 'ERROR:' . $e->getMessage().PHP_EOL;
                    continue;
                }

                $catch($e, $result);
            }
        }
    }
}

因为存在多个进程处理同一个队列的情况,就会出现一条数据被多个进程获取到,所以只有当 ZREM 命令移除数据成功时,才算是真正的获取到了数据。

使用

添加数据

require_once './vendor/autoload.php';
require_once './RedisDelayQueue.php';

$queue = new RedisDelayQueue();

// 添加一条数据
$data = [
    'content' => '投递数据的时间是:'.date('Y-m-d H:i:s'),
    'time' => time() + 10,  // 十秒后处理
];

// 添加多条数据
// $data = [
//     [
//         'content' => '投递数据的时间是:'.date('Y-m-d H:i:s'),
//         'time' => time() + 10,  // 十秒后处理
//     ],
// ];

echo $queue->push('test', $data);

消费数据

require_once './vendor/autoload.php';
require_once './RedisDelayQueue.php';

$queue = new RedisDelayQueue();

$queue->consume('test', function ($value) {
    printf("[%s] %s \r\n", date('Y-m-d H:i:s'), $value);
});

// 处理异常情况
$queue->consume('test', function ($value) {
    printf("[%s] %s \r\n", date('Y-m-d H:i:s'), $value);
    // 假装出现异常
    throw new \Exception('数据库连接超时');
}, function ($e, $value) {
    printf("发生异常:%s \r\n", $e->getMessage());
    printf("获取到的数据:%s \r\n", $value);
});

运行结果

php index.php

[2020-01-19 02:29:47] 投递数据的时间是:2020-01-19 02:29:37
[2020-01-19 02:30:19] 投递数据的时间是:2020-01-19 02:30:09

使用 Lua 脚本进行优化

上面提到了,一条数据可能会被多个进程获取到,然后通过 ZREM 移除数据判断是否抢占数据成功,执行 ZRANGEBYSCOREZREM 命令会发出两次请求,那些没有抢到数据的进程就相当于这次数据白获取了。

使用 Lua 脚本进行优化,只发出一次请求,在服务端进行数据抢占操作。

Redis 中执行 Lua 脚本的命令是 EVAL

EVAL script numkeys key [key ...] arg [arg ...]
  • script:Lua 脚本的内容。
  • numkeys:命令的个数(用于区分 key 和 arg)。
  • key:从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),在 Lua 中通过全局变量 KEYS 数组访问。
  • arg:附加参数,全局变量 ARGV 数组访问。

Redis 版本 >= 2.6.0 才能使用 Lua 脚本。

修改如下:

    public function consume($queueName, \Closure $callback, \Closure $catch = null)
    {
        $script = $this->getLuaScript();

        while (true) {
            // 使用 eval 执行 Lua 脚本
            $result = $this->client->eval($script, 2, $queueName, time());
            // 没获取到数据就休息一秒
            if (empty($result)) {
                sleep(1);
                continue;
            }

            try {
                $callback($result);
            } catch(\Exception $e) {
                if (!$catch instanceof \Closure) {
                    echo 'ERROR:' . $e->getMessage().PHP_EOL;
                    continue;
                }

                $catch($e, $result);
            }
        }
    }

    /**
     * 获取 lua 脚本
     * @return string
     */
    public function getLuaScript()
    {
        return <<<LUA
            local result = redis.call('zrangebyscore', KEYS[1], 0, KEYS[2], 'limit', 0, 1);
            if (table.getn(result) == 0)
            then
                return false;
            end
            if (redis.call('zrem', KEYS[1], result[1]) > 0)
            then
                return result[1];
            else
                return false;
            end
LUA;
    }

最后

需要注意是:基于 Redis 的延迟队列不能 100% 保证可靠性

如果在使用 ZREM 命令将数据从集合中移除后,处理数据时发生了异常,那么这条数据就丢失了,也就是缺少了 ACK 机制,所以在使用时候需要进行权衡,或者使用 RabbitMQ 这些专业的消息中间件。

写到这已经3点50了…溜了溜了


2020.01.20 01:28 更新

增加处理异常的回调参数,可自定义处理抛出的异常。

本文作者:她和她的猫
本文地址https://her-cat.com/posts/2020/01/18/redis-based-delay-queue/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!