什么是 AIO
AIO 是 Asynchronous Input/Output 的简写,也就是异步 IO。不过在谈什么是 AIO 之前,我们可能要先介绍一下 BIO。那么什么是 BIO 呢?简单的说,BIO 是 Blocking Input/Output,也就是阻塞 IO,他实现的通常是在线程池中找出一个线程处理 IO,在 IO 过程中,其他线程都需要等待 IO 完成后才可以从中选取一个线程占用 IO。这样最大的问题是,当线程数量较多,并且需要大量的 IO 操作时,就会造成一个大量的阻塞,因为实际上每次只有一个线程在处理 IO。
那么如何解决这个时候的问题呢?这时候就提出了 AIO 的概念。通常在 IO 处理过程中也会伴有一些其他的处理操作,假如把所有的操作都浪费在了等待 IO 释放上,线程池中的线程利用率也太低了,因此我们需要一种方式,在申请 IO 处理之后,就去继续做其他的事情,等 IO 操作完成了,然后通知我们已经 OK,我们可以继续处理了。这也就是我们常说的 AIO 的原型。
AIO 的情况也说明了它适用的场景:长连接场景,或者重度的 IO 操作等等的情况。
如果找软件来做案例,我们可以找一个可能大家熟知的:NGINX。正如我们所知,NGINX 采用了 异步、事件驱动的方法来处理连接。这种处理方式无需(像使用传统架构的服务器一样)为每个请求创建额外的专用进程或者线程,而是在一个工作进程中处理多个连接和请求。为此,NGINX 工作在非阻塞的 socket 模式下,并使用了 epoll 和 kqueue 这样有效的方法。
这部分的内容,在 NGINX 引入线程池 性能提升 9 倍 中进行了详细的介绍,包含了 NGINX 的异步应用经验,同时介绍了 NGINX 中引入了阻塞的线程池用于解决某些特定场景问题下的效率。
如何实现 Python 的异步 IO
这篇文章会以最新的 Python 3.5 为基础来介绍实现一个异步的 Python Redis Client。不过在此之前,我们先来看一下,怎么实现 Python 的 aio。
Python 的 aio 官方封装了一个比较合适的基础库 asyncio
。
从一个例子开始简单认识一下如何实现一个异步的 aio client。这里以官方文档中的例子为例:
import asyncio
async def tcp_echo_client(message, loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = await reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()
这里面用到的 Python 3.5 中引入的 async/await
关键字,还有 asyncio
库。这里面 asyncio.open_connection
会返回一个 coroutine,这个可以使用 await 进行一个 aio 的调用,即,在收到返回信号之前,程序可以继续去处理其他的任务。这里面真正核心的就是 EventLoop
,它负责监视发送这些信号,并且返回数据,它可以通过 asyncio.get_event_loop
获取到。然后他会真正返回的数据是一个读取 StreamReader
和写入 StreamWriter
的对象。
接下来,就可以通过这个 reader
和 writer
进行数据的读取和写入。writer
是可以直接写入的,如果是 reader
的话,就需要 aio 的方式等待受到数据后返回。这样看起来更接近于普通的 socket 编程。不过关闭连接时,仅仅需要关闭 writer
就足够了。
从 socket 通讯到 redis 通讯
本质上来说,所有的网络请求都可以看成是 SocketIO 的请求,因此,我们可以把 Redis 的请求当做是一个 socket 的通讯来进行,这样就很方便了。
不过先等一等,那么通讯的数据格式怎么办?没关系,这里我们使用 hiredis-py
来解决协议解析的问题。不过,从库设计的角度来说,我们需要封装一个 RedisConnection 的类出来解决 Redis 的通讯协议。它可能传入的参数包含,一个 StreamReader
、一个 StreamWriter
,一个 EventLoop
,哦,别忘记还有编码 encoding
。其他的我们就用一个 *
来表示好了。
class RedisConnection(object):
'''Redis Connection'''
def __init__(self, reader, writer, *, encoding=None, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._reader = reader
self._writer = writer
self._encoding = encoding
self._loop = loop
self._db = 0
def __repr__(self):
return '<RedisConnection [db:{}]>'.format(self._db)
记得加上 __repr__
用来描述这个对象,这个可是一个好习惯。接下来就需要完善这个类了,比如,我们需要添加一个关闭连接的方法,这需要至少一个参数用于标记连接是否关闭,一个用于执行关闭操作,比如我们需要这样子的:
def close(self):
"""Close connection."""
self._do_close(None)
def _do_close(self, exc):
if self._closed:
return
self._closed = True
self._closing = False
# 关闭写入
self._writer.transport.close()
# 取消读取任务
self._reader_task.cancel()
self._reader_task = None
self._writer = None
self._reader = None
@property
def closed(self):
"""True if connection is closed."""
closed = self._closing or self._closed
if not closed and self._reader and self._reader.at_eof():
self._closing = closed = True
self._loop.call_soon(self._do_close, None)
return closed
连接这类的方法已经处理完了,接下来就应该是执行 Redis 命令了,我们可以叫它 execute
。那他需要几个东西,一个是执行的指令 command
,一个是指令参数 *args
,还有一些其他的,比如编码 encoding
。这里为了节省时间,只是考虑一些 Set 和 Get 的基本操作。哦,不过等等,那么 Redis 的数据结构是什么样子的呢?我们还需要先把它编译成 Redis-server 可以识别的形式,那么需要一个 encode_command
方法。
_converters = {
bytes: lambda val: val,
bytearray: lambda val: val,
str: lambda val: val.encode('utf-8'),
int: lambda val: str(val).encode('utf-8'),
float: lambda val: str(val).encode('utf-8'),
}
def encode_command(*args):
"""Encodes arguments into redis bulk-strings array.
Raises TypeError if any of args not of bytes, str, int or float type.
"""buf = bytearray()
def add(data):
return buf.extend(data + b'\r\n')
add(b'*'+ _bytes_len(args))
for arg in args:
if type(arg) in _converters:
barg = _converters[type(arg)](arg)
add(b'$'+ _bytes_len(barg))
add(barg)
else:
raise TypeError("Argument {!r} expected to be of bytes,""str, int or float type".format(arg))
return buf
这样可以转化为可以识别的形式了,接下来还有一个问题,那么怎么让程序可以等待信号的生效呢?这里介绍一下 asyncio.Future
。这个 asyncio.Future
类是用于封装回调函数的类,包含了一些更加方便使用的方法。通过这个类,可以实现 aio 的通知机制,也就是回调。这个类实例可以通过 await
返回我们需要的结果。不过这样就还需要在项目中添加一些更多的变量,比如所有等待返回的 self._waiters
。
def execute(self, command, *args, encoding=None):
"""Executes redis command and returns Future waiting for the answer.
Raises:
* TypeError if any of args can not be encoded as bytes.
* ReplyError on redis '-ERR' resonses.
* ProtocolError when response can not be decoded meaning connection
is broken.
"""assert self._reader and not self._reader.at_eof(), ("Connection closed or corrupted")
if command is None:
raise TypeError("command must not be None")
if None in set(args):
raise TypeError("args must not contain None")
# 这样小写也没有问题了
command = command.upper().strip()
if encoding is None:
encoding = self._encoding
fut = asyncio.Future(loop=self._loop)
self._writer.write(encode_command(command, *args))
self._waiters.append((fut, encoding, cb))
return fut
现在所有的命令都已经发送到了 redis-server,接下来就需要读取对应的结果了。
async def _read_data(self):
"""Response reader task."""
while not self._reader.at_eof():
try:
data = await self._reader.read(65536)
except asyncio.CancelledError:
break
except Exception as exc:
# XXX: for QUIT command connection error can be received
# before response
logger.error("Exception on data read %r", exc, exc_info=True)
break
self._parser.feed(data)
while True:
try:
obj = self._parser.gets()
except ProtocolError as exc:
# ProtocolError is fatal
# so connection must be closed
self._closing = True
self._loop.call_soon(self._do_close, exc)
if self._in_transaction:
self._transaction_error = exc
return
else:
if obj is False:
break
else:
self._process_data(obj)
self._closing = True
self._loop.call_soon(self._do_close, None)
def _process_data(self, obj):
"""Processes command results."""
waiter, encoding, cb = self._waiters.popleft()
if waiter.done():
logger.debug("Waiter future is already done %r", waiter)
assert waiter.cancelled(), ("waiting future is in wrong state", waiter, obj)
return
if isinstance(obj, RedisError):
waiter.set_exception(obj)
if self._in_transaction:
self._transaction_error = obj
else:
if encoding is not None:
try:
obj = decode(obj, encoding)
except Exception as exc:
waiter.set_exception(exc)
return
waiter.set_result(obj)
if cb is not None:
cb(obj)
有了这些之后,我们就可以简单创建一个连接了:
async def create_connection(address, *, db=None, password=None,
encoding=None, loop=None):
"""Creates redis connection.
Opens connection to Redis server specified by address argument.
Address argument is similar to socket address argument, ie:
* when address is a tuple it represents (host, port) pair;
* when address is a str it represents unix domain socket path.
(no other address formats supported)
Encoding argument can be used to decode byte-replies to strings.
By default no decoding is done.
Return value is RedisConnection instance.
This function is a coroutine.
"""assert isinstance(address, (tuple, list, str)), "tuple or str expected"
if isinstance(address, (list, tuple)):
host, port = address
reader, writer = await asyncio.open_connection(host, port, loop=loop)
else:
reader, writer = await asyncio.open_unix_connection(address, loop=loop)
conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
try:
if password is not None:
yield from conn.auth(password)
if db is not None:
yield from conn.select(db)
except Exception:
conn.close()
return conn
这样,连接部分的代码基本上已经处理完成了,接下来要做的就是实现基于这个连接的命令执行了,下面的内容会下一个文章中继续介绍,敬请期待。