​ redis消息队列不是专业的消息队列,没有非常多的高级特性,没有ack保证,适合用于对消息没有很高的可靠性要求的场景。

1.list实现异步消息队列

​ 使用rpush和lpush实现消息入队列。rpop和lpop实现消息出队列。

2.阻塞读:防止队列空时无限循环

​ 如果队列空了,客户端会陷入pop的死循环中,会拉高客户端CPU的消耗,也会是redis的QPS拉高,如果这样的客户端有几十个,redis的慢查询可能也会增多。

QPS:每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准,在因特网上,作为域名系统服务器的机器的性能经常用每秒查询率来衡量。

a.sleep使读操作慢下来

​ 使用sleep,当队列为空时,让线程sleep一会。可以让客户端的CPU消耗降下来,Redis的QPS也能降下来。

b.阻塞读(brpop,blpop)

​ 使用睡眠会导致消息的延时增大。如果只有一个消费者,那么延时就是1秒。如果有多个消费者,这个延迟会降低,因为每个消费者的睡眠时间的叉开的。

​ 可以使用brpop/blpop来解决。b指blocking。

​ 可以在队列中无数据时,进入休眠状态,一旦数据到来,则立刻醒过来,消息的延迟几乎为零。

​ blpop和brpop可以传入参数timeout(秒),timeout秒后没拿到,则返回None。timeout默认为0,为0表示一直等待。

​ 当timeout为0时,线程会一直阻塞在那里,Redis的客户端连接就成了闲置连接,如果闲置太久,服务器一般会主动断开连接,减少闲置资源占用,这时blpop和brpop就会抛出异常,这个异常需要我们主动去捕获,再重新获取。

3.延时队列实现

​ 延时队列可以使用zset实现。将消息序列化成一个字符串作为zset的value,消息的到期处理时间作为score,然后用多个线程轮询zset获取到期的任务。因为使用多线程,需要考虑并发争抢问题,避免一个任务别多次执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# -*- coding: utf-8
import json
import redis
import time


def msg_delay(msg):
value = json.dumps(msg)
score = str(time.time() + 5) # 假定消息5秒后过期
# 消息存入队列
client.zadd('msg_queue', {value: score})


def msg_loop():
while True:
# 每次只取一条消息
values = client.zrangebyscore('msg_queue', 0, time.time(), start=0, num=1)
if not values:
time.sleep(1)
print("没有消息,睡眠一秒")
continue
value = values[0]
is_success = client.zrem('msg_queue', value)
# 如果消除消息成功 表示抢到了消息
if is_success:
print('消息删除成功')
msg = json.loads(value)
msg_handle(msg)


def msg_handle(msg):
"""
消息处理
:param msg:
:return:
"""
# 任务处理增加异常捕获 避免个别任务处理失败导致整个线程挂掉
try:
print('拿到消息,执行处理逻辑')
print(msg)
except:
print('捕获异常')


if __name__ == '__main__':
client = redis.StrictRedis(password='Donghuan@2019')
msg_delay({'msg_test': 'msg'})
msg_loop()

zrem保证是否抢到任务