从零实现一个 Redis 客户端(二)

从 Call 到命令端

在第一个文章中,我们介绍了实现一个 Call 的客户端基本模型,但只是 Call 怎么能满足需求呢?比如在 redis-py 中,一个完整的客户端应该是这样的:

1
2
client = redis.StrictRedis()
client.setex("key", 10, "value")

接下来作为一个程序的客户端,需要去做的就是封装出一个 Redis Client。比如 setex 方法:

1
2
3
4
5
6
7
8
def setex(self, key, seconds, value):
"""Set the value and expiration of a key.

:raises TypeError: if seconds is neither int
"""if not isinstance(seconds, int):
raise TypeError("milliseconds argument must be int")
fut = self._conn.execute(b'SETEX', key, seconds, value)
return wait_ok(fut)

剩下的就是一个个方法逐个完善。

什么是连接池

我们会看到,无论那个数据库客户端,总是会有连接池机制。那么连接池是什么呢?我们为什么需要连接池呢?

首先,我们都知道,对连接而言,创建是必要重型的操作。比如说,TCP 连接,接下来之后是登录认证等等过程,最后才会执行命令。这也就是我们通常计算库性能时,很多时候会把建立连接的时候去掉。但是这就出现了一个问题,当一个连接被占用时,其他的操作仍旧是不能够完成操作了,只能等待前一个操作完成。但是假如我们一次性创建一堆连接呢?从一堆连接中找到空闲的连接,使用完成后释放成空闲的状态,这就是线程池的本质。因为减少了每次创建连接的过程,所以对性能提升也非常有帮助。

从单连接到连接池

首先,还是创建一个 RedisPool 类,用于管理 Redis 的连接池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class RedisPool:
"""Redis connections pool."""

def __init__(self, address, db=0, password=None, encoding=None,
*, minsize, maxsize, commands_factory, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._address = address
self._db = db
self._password = password
self._encoding = encoding
self._minsize = minsize
self._factory = commands_factory
self._loop = loop # 连接池数组
self._pool = collections.deque(maxlen=maxsize)
self._used = set()
self._acquiring = 0
self._cond = asyncio.Condition(loop=loop)

def _create_new_connection(self):
return create_redis(self._address,
db=self._db,
password=self._password,
encoding=self._encoding,
commands_factory=self._factory,
loop=self._loop)

接下来,就需要创建大量的连接了:

1
2
3
4
5
6
7
8
9
10
11
12
async def create_pool(self, *, override_min):
# todo: drop closed connections first
# 判断是否达到了连接池数量限制
while not self._pool and self.size < self.maxsize:
self._acquiring += 1
try:
conn = await self._create_new_connection()
self._pool.append(conn)
finally:
self._acquiring -= 1
# connection may be closed at yeild point
self._drop_closed()

那么怎么从这些连接中抽取连接并且进行连接呢:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@asyncio.coroutine
def acquire(self):
"""Acquires a connection from free pool.
Creates new connection if needed.
"""
with await self._cond:
while True:
await self._fill_free(override_min=True)
if self.freesize:
conn = self._pool.popleft()
assert not conn.closed, conn
assert conn not in self._used, (conn, self._used)
self._used.add(conn)
return conn
else:
await self._cond.wait()

接下来就是使用完成后进行释放即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def release(self, conn):
"""Returns used connection back into pool.
When returned connection has db index that differs from one in pool
the connection will be closed and dropped.
When queue of free connections is full the connection will be dropped.
"""assert conn in self._used,"Invalid connection, maybe from other pool"self._used.remove(conn)
if not conn.closed:
if conn.in_transaction:
logger.warning("Connection %r in transaction, closing it.",
conn)
conn.close()
elif conn.db == self.db:
if self.maxsize and self.freesize < self.maxsize:
self._pool.append(conn)
else:
# consider this connection as old and close it.
conn.close()
else:
conn.close()
# FIXME: check event loop is not closed
asyncio.async(self._wakeup(), loop=self._loop)

至此,你已经可以实现一个基本的 Redis 客户端了,还在犹豫什么?快自己动手吧!

注: 文中 Redis 库参考了 aio-lib/aioredis 库。