Python의 asyncio를 직접 만들어보자 (3)
실은 여기서부터가 실제 튜토리얼의 내용이다. 이전 포스트 2개는 이 포스트를 이해/진행하기 위한 내용이었다.
동시성을 보장하지 않는 소켓 서버
먼저 동시성을 보장하지 않는 소켓 서버 부터 작성해보자.
import socket
def algorithm(value):
return value * 2
def handler(client):
while True:
req = client.recv(100) # size of bytes chuck
if not req:
client.send('@ close connection\n'.encode())
client.close()
return
try:
value = int(req)
except ValueError:
client.send('@ enter integer\n'.encode())
client.close()
return
resp = algorithm(value)
client.send(f'> {resp}\n'.encode())
def server(host, port):
sock = socket.socket()
# SOL_SOCKET + SO_REUSEADDR: reuse already used address
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port,))
# 5 is max number of queued connections
# https://docs.python.org/2/library/socket.html#socket.socket.listen
sock.listen(5)
while True:
client, client_address = sock.accept()
client.send('@ connect\n'.encode())
handler(client)
if __name__ == '__main__':
server('localhost', 30303)
왜 동시성을 보장하지 않을까?
- 첫 번 째 클라이언트가
accpet()을 통해 접속하고recv()의 무한 루프를 통해서 계속 서버와 통신을 하고 있다고 하면, - Python 런타임의 싱글 쓰레드는 항상 첫 번 째 클라이언트
recv()에서 블락되고 있고 루프를 빠져나오지 못하므로 , - 두 번 째 클라이언트가 접속을 시도해도
accept()을 받아주지 못하기 때문에,
동시성 보장이 되지 않는다. 위의 accept(), recv(), send() 등의 함수는 모두 파일 입출력을 시도하는 함수들인데, 각각의 파일이 읽기/쓰기가 가능해질 때 까지 쓰레드를 블락하고 자원을 낭비한다.
async/await 키워드를 이용한 동시성 구현
그럼 이제 여기에 Python의 async(== 제너레이터), await(== yield from: 반복 next() 혹은 send(None) 호출을 통한 제너레이터 소모) 프로그래밍을 입혀서 동시성을 구현해보자.
가장 중요한 것은 제너레이터와 select 시스템 콜 등으로 대표되는 OS의 I/O Multiplexer이다.
import enum
import select
import socket
class Action(enum.Enum):
READ = enum.auto()
WRITE = enum.auto()
class Can:
def __init__(self, action, sock):
self.action = action
self.sock = sock
def __await__(self):
yield self.action, self.sock
class AsyncSocket:
def __init__(self, sock, *args, **kwargs):
self.sock = sock
super().__init__(*args, **kwargs)
async def accept(self):
await Can(Action.READ, self.sock)
return self.sock.accept()
async def recv(self, length):
await Can(Action.READ, self.sock)
return self.sock.recv(length)
async def send(self, data):
if isinstance(data, str):
data = data.encode()
await Can(Action.WRITE, self.sock)
self.sock.send(data)
async def close(self):
self.sock.close()
class TaskManager:
current_task = None
tasks = []
wait_read = {}
wait_write = {}
def add_task(self, task):
self.tasks.append(task)
def run(self):
while any((self.tasks, self.wait_read, self.wait_write)):
while not self.tasks:
readables, writables, _ = select.select(self.wait_read, self.wait_write, [])
for sock in readables:
task = self.wait_read.pop(sock)
self.add_task(task)
for sock in writables:
task = self.wait_write.pop(sock)
self.add_task(task)
self.current_task = self.tasks.pop(0)
try:
action, sock = self.current_task.send(None)
except StopIteration:
continue
if action is Action.READ:
self.wait_read[sock] = self.current_task
elif action is Action.WRITE:
self.wait_write[sock] = self.current_task
else:
raise RuntimeError(f'wrong action {action}')
task_manager = TaskManager()
async def algorithm(value):
return value * 2
async def handler(aclient):
while True:
req = await aclient.recv(100) # size of bytes chuck
if not req:
await aclient.send('@ close connection\n'.encode())
await aclient.close()
return
try:
value = int(req)
except ValueError:
await aclient.send('@ enter integer\n'.encode())
await aclient.close()
return
resp = await algorithm(value)
await aclient.send(f'> {resp}\n'.encode())
async def server(host, port):
sock = socket.socket()
# SOL_SOCKET + SO_REUSEADDR: reuse already used address
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port,))
# 5 is max number of queued connections
# https://docs.python.org/2/library/socket.html#socket.socket.listen
sock.listen(5)
asock = AsyncSocket(sock)
while True:
client, client_address = await asock.accept()
aclient = AsyncSocket(client)
await aclient.send('@ connect\n')
task_manager.add_task(handler(aclient))
if __name__ == '__main__':
task_manager.add_task(server('localhost', 30303))
task_manager.run()
한 번에 이해하기가 꽤나 어렵다. 그래도 쉽게 말해보면,
- 파일 읽기/쓰기가 필요한 제너레이터안에서 입출력을 시도하기 직전에 이벤트 루프로 제어권을 넘긴다.
- 제어권을 넘기는 부분:
await Can() - 이벤트 루프
TaskManager.run()
- 제어권을 넘기는 부분:
- 이벤트 루프는 방금 그 제너레이터와 감시가 필요한 파일을 읽기/쓰기 대기 큐에 넣고,
- 읽기/쓰기 대기 큐:
TaskManager.wait_read,TaskManager.wait_write
- 읽기/쓰기 대기 큐:
- 당장 실행 가능한(파일 입출력이 필요 없거나, 파일이 바로 읽기/쓰기가 가능한 상태가 된 제너레이터) 제너레이터들이 들어있는 큐를 모두 실행하면서 비운다.
- 당장 실행 가능한 대기 큐:
TaskManager.tasks
- 당장 실행 가능한 대기 큐:
- 읽기/쓰기 대기 큐에 들어갔던 파일이 OS의 I/O Multiplexer에 의해 입출력 가능 상태가 되면, 당장 실행 가능한 대기 큐에 제너레이터를 넣어준다.
- 1~4를 반복한다.
장점
콜백의 못생김/위험함 문제를 모두 해결하고 동기적 코드 형태와 95% 이상 유사한 형태의 코드 작성만으로 싱글 쓰레드 동시성 구현이 가능하다!
실제 asyncio 패키지와의 비교
Can은Futureasync def(제네레이터)는TaskTaskManager.run()은 이벤트 루프
라고 생각할 수 있다.
김준기님 발표에 있던 보너스 숙제
sleep구현해보기select대신 다른 멀티플렉서를 사용해보고 그 이유 알아보기- Javascript의
async,await와는 어떻게 다른지 알아보기 (특히Promise에 집중해서)
Array