前言

之前在一些博客上看到过讲如何实现延迟队列,但是平时没用上也没有动手实现过。

在上次面试的时候,面试官也问过我有没有用过延迟队列,最后凭借着记忆讲了下如何用 Redis 的有序集合实现延迟队列,以及有什么缺点。

纸上得来终觉浅,绝知此事要躬行。

这句诗就是本文的主要目的。

原理

主要用到了 Redis 的三个命令,ZADDZREMZRANGEBYSCORE

1
2
3
4
5
ZADD key_name score value

ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

ZREM key value [value ...]

ZADD 用于向有序集合中添加一条或多条数据,score 作为数据处理时间,value 存放数据。

ZRANGEBYSCORE 用于获取有序集合中指定时间范围内的数据,按照时间戳升序排列。

ZREM 用于将一条或多条数据从有序集合中移除。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 添加测试数据
127.0.0.1:6379> ZADD test 1 a 2 b 3 c 4 d 5 e
(integer) 5

# 获取 2 到 4 之间的数据
127.0.0.1:6379> ZRANGEBYSCORE test 2 4
1) "b"
2) "c"
3) "d"

# 获取一条 2 到 4 之间的数据
127.0.0.1:6379> ZRANGEBYSCORE test 2 4 LIMIT 0 1
1) "b"

# 移除数据 b
127.0.0.1:6379> ZREM test b
(integer) 1

127.0.0.1:6379> ZRANGEBYSCORE test 2 4 LIMIT 0 1
1) "c"

在 Redis 2.4 版本前,ZADD、ZREM 每次只能添加/删除一条数据。

实现

用的是 predis/predis 扩展包操作 Redis。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use Predis\Client;

class RedisDelayQueue
{
/**
* @var Client
*/
private $client = null;

/**
* RedisDelayQueue constructor.
* @param Client $client
*/
public function __construct(Client $client = null)
{
$this->client = $client ?: $this->createClient();
}

private function createClient()
{
$params = [
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
];

return new Client($params);
}

/**
* 添加数据
*
* @param $queueName
* @param $data
* @return int
*/
public function push($queueName, $data)
{
$members = [];
$data = isset($data['content']) ? [$data] : $data;

foreach($data as $datum) {
$members[$datum['content']] = $datum['time'];
}

return $this->client->zadd($queueName, $members);
}

/**
* 消费数据
* @param $queueName
* @param \Closure $callback
* @param \Closure $catch
*/
public function consume($queueName, \Closure $callback, \Closure $catch = null)
{
$options = [
'limit' => [
'offset' => 0,
'count' => 1, // 只取一条数据
]
];

while (true) {
// 从集合中获取一条小于当前时间的数据
$result = $this->client->zrangebyscore($queueName, 0, time(), $options);
// 没获取到数据就休息一秒
if (empty($result)) {
sleep(1);
continue;
}

// 将数据从集合中移除
if (!$this->client->zrem($queueName, $result[0])) {
continue;
}

try {
$callback($result[0]);
} catch(\Exception $e) {
if (!$catch instanceof \Closure) {
echo 'ERROR:' . $e->getMessage().PHP_EOL;
continue;
}

$catch($e, $result);
}
}
}
}

因为存在多个进程处理同一个队列的情况,就会出现一条数据被多个进程获取到,所以只有当 ZREM 命令移除数据成功时,才算是真正的获取到了数据。

使用

添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
require_once './vendor/autoload.php';
require_once './RedisDelayQueue.php';

$queue = new RedisDelayQueue();

// 添加一条数据
$data = [
'content' => '投递数据的时间是:'.date('Y-m-d H:i:s'),
'time' => time() + 10, // 十秒后处理
];

// 添加多条数据
// $data = [
// [
// 'content' => '投递数据的时间是:'.date('Y-m-d H:i:s'),
// 'time' => time() + 10, // 十秒后处理
// ],
// ];

echo $queue->push('test', $data);

消费数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
require_once './vendor/autoload.php';
require_once './RedisDelayQueue.php';

$queue = new RedisDelayQueue();

$queue->consume('test', function ($value) {
printf("[%s] %s \r\n", date('Y-m-d H:i:s'), $value);
});

// 处理异常情况
$queue->consume('test', function ($value) {
printf("[%s] %s \r\n", date('Y-m-d H:i:s'), $value);
// 假装出现异常
throw new \Exception('数据库连接超时');
}, function ($e, $value) {
printf("发生异常:%s \r\n", $e->getMessage());
printf("获取到的数据:%s \r\n", $value);
});

运行结果

1
2
3
4
php index.php

[2020-01-19 02:29:47] 投递数据的时间是:2020-01-19 02:29:37
[2020-01-19 02:30:19] 投递数据的时间是:2020-01-19 02:30:09

使用 Lua 脚本进行优化

上面提到了,一条数据可能会被多个进程获取到,然后通过 ZREM 移除数据判断是否抢占数据成功,执行 ZRANGEBYSCOREZREM 命令会发出两次请求,那些没有抢到数据的进程就相当于这次数据白获取了。

使用 Lua 脚本进行优化,只发出一次请求,在服务端进行数据抢占操作。

Redis 中执行 Lua 脚本的命令是 EVAL

1
EVAL script numkeys key [key ...] arg [arg ...]
  • script:Lua 脚本的内容。
  • numkeys:命令的个数(用于区分 key 和 arg)。
  • key:从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),在 Lua 中通过全局变量 KEYS 数组访问。
  • arg:附加参数,全局变量 ARGV 数组访问。

Redis 版本 >= 2.6.0 才能使用 Lua 脚本。

修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    public function consume($queueName, \Closure $callback, \Closure $catch = null)
{
$script = $this->getLuaScript();

while (true) {
// 使用 eval 执行 Lua 脚本
$result = $this->client->eval($script, 2, $queueName, time());
// 没获取到数据就休息一秒
if (empty($result)) {
sleep(1);
continue;
}

try {
$callback($result);
} catch(\Exception $e) {
if (!$catch instanceof \Closure) {
echo 'ERROR:' . $e->getMessage().PHP_EOL;
continue;
}

$catch($e, $result);
}
}
}

/**
* 获取 lua 脚本
* @return string
*/
public function getLuaScript()
{
return <<<LUA
local result = redis.call('zrangebyscore', KEYS[1], 0, KEYS[2], 'limit', 0, 1);
if (table.getn(result) == 0)
then
return false;
end
if (redis.call('zrem', KEYS[1], result[1]) > 0)
then
return result[1];
else
return false;
end
LUA;
}

最后

需要注意是:基于 Redis 的延迟队列不能 100% 保证可靠性

如果在使用 ZREM 命令将数据从集合中移除后,处理数据时发生了异常,那么这条数据就丢失了,也就是缺少了 ACK 机制,所以在使用时候需要进行权衡,或者使用 RabbitMQ 这些专业的消息中间件。

写到这已经3点50了…溜了溜了


2020.01.20 01:28 更新

增加处理异常的回调参数,可自定义处理抛出的异常。