Dev

Python의 asyncio를 직접 만들어보자 (3)

October 18, 2018

Python의 asyncio를 직접 만들어보자 (3)

실은 여기서부터가 실제 튜토리얼의 내용이다. 이전 포스트 2개는 이 포스트를 이해/진행하기 위한 내용이었다.

동시성을 보장하지 않는 소켓 서버

먼저 동시성을 보장하지 않는 소켓 서버 부터 작성해보자.

import socket
    
    
def algorithm(value):
    return value * 2
    
    
def handler(client):
    while True:
        req = client.recv(100)  # size of bytes chuck
        if not req:
            client.send('@ close connection\n'.encode())
            client.close()
            return
    
        try:
            value = int(req)
        except ValueError:
            client.send('@ enter integer\n'.encode())
            client.close()
            return
    
        resp = algorithm(value)
        client.send(f'> {resp}\n'.encode())
    
    
def server(host, port):
    sock = socket.socket()
    # SOL_SOCKET + SO_REUSEADDR: reuse already used address
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port,))
        # 5 is max number of queued connections
    # https://docs.python.org/2/library/socket.html#socket.socket.listen
    sock.listen(5)
    
    while True:
        client, client_address = sock.accept()
        client.send('@ connect\n'.encode())
        handler(client)
    
    
if __name__ == '__main__':
    server('localhost', 30303)

왜 동시성을 보장하지 않을까?

  • 첫 번 째 클라이언트가 accpet()을 통해 접속하고 recv()의 무한 루프를 통해서 계속 서버와 통신을 하고 있다고 하면,
  • Python 런타임의 싱글 쓰레드는 항상 첫 번 째 클라이언트 recv()에서 블락되고 있고 루프를 빠져나오지 못하므로 ,
  • 두 번 째 클라이언트가 접속을 시도해도 accept()을 받아주지 못하기 때문에,

동시성 보장이 되지 않는다. 위의 accept(), recv(), send() 등의 함수는 모두 파일 입출력을 시도하는 함수들인데, 각각의 파일이 읽기/쓰기가 가능해질 때 까지 쓰레드를 블락하고 자원을 낭비한다.

async/await 키워드를 이용한 동시성 구현

그럼 이제 여기에 Python의 async(== 제너레이터), await(== yield from: 반복 next() 혹은 send(None) 호출을 통한 제너레이터 소모) 프로그래밍을 입혀서 동시성을 구현해보자.

가장 중요한 것은 제너레이터와 select 시스템 콜 등으로 대표되는 OS의 I/O Multiplexer이다.

import enum
import select
import socket
    
    
class Action(enum.Enum):
    READ = enum.auto()
    WRITE = enum.auto()
    
    
class Can:
    def __init__(self, action, sock):
        self.action = action
        self.sock = sock
    
    def __await__(self):
        yield self.action, self.sock
    
    
class AsyncSocket:
    def __init__(self, sock, *args, **kwargs):
        self.sock = sock
        super().__init__(*args, **kwargs)
    
    async def accept(self):
        await Can(Action.READ, self.sock)
        return self.sock.accept()
    
    async def recv(self, length):
        await Can(Action.READ, self.sock)
        return self.sock.recv(length)
    
    async def send(self, data):
        if isinstance(data, str):
            data = data.encode()
    
        await Can(Action.WRITE, self.sock)
        self.sock.send(data)
    
    async def close(self):
        self.sock.close()
    
    
class TaskManager:
    current_task = None
    tasks = []
    wait_read = {}
    wait_write = {}
    
    def add_task(self, task):
        self.tasks.append(task)
    
    def run(self):
        while any((self.tasks, self.wait_read, self.wait_write)):
            while not self.tasks:
                readables, writables, _ = select.select(self.wait_read, self.wait_write, [])
                for sock in readables:
                    task = self.wait_read.pop(sock)
                    self.add_task(task)
    
                for sock in writables:
                    task = self.wait_write.pop(sock)
                    self.add_task(task)
    
            self.current_task = self.tasks.pop(0)
    
            try:
                action, sock = self.current_task.send(None)
            except StopIteration:
                continue
            
            if action is Action.READ:
                self.wait_read[sock] = self.current_task
            elif action is Action.WRITE:
                self.wait_write[sock] = self.current_task
            else:
                raise RuntimeError(f'wrong action {action}')
    
    
task_manager = TaskManager()
    
    
async def algorithm(value):
    return value * 2
    
    
async def handler(aclient):
    while True:
        req = await aclient.recv(100)  # size of bytes chuck
        if not req:
            await aclient.send('@ close connection\n'.encode())
            await aclient.close()
            return
    
        try:
            value = int(req)
        except ValueError:
            await aclient.send('@ enter integer\n'.encode())
            await aclient.close()
            return
    
        resp = await algorithm(value)
        await aclient.send(f'> {resp}\n'.encode())
    
    
async def server(host, port):
    sock = socket.socket()
    # SOL_SOCKET + SO_REUSEADDR: reuse already used address
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((host, port,))
    # 5 is max number of queued connections
    # https://docs.python.org/2/library/socket.html#socket.socket.listen
    sock.listen(5)
    asock = AsyncSocket(sock)
    
    while True:
        client, client_address = await asock.accept()
        aclient = AsyncSocket(client)
        await aclient.send('@ connect\n')
        task_manager.add_task(handler(aclient))
    
    
if __name__ == '__main__':
    task_manager.add_task(server('localhost', 30303))
    task_manager.run()

한 번에 이해하기가 꽤나 어렵다. 그래도 쉽게 말해보면,

  1. 파일 읽기/쓰기가 필요한 제너레이터안에서 입출력을 시도하기 직전에 이벤트 루프로 제어권을 넘긴다.
    • 제어권을 넘기는 부분: await Can()
    • 이벤트 루프 TaskManager.run()
  2. 이벤트 루프는 방금 그 제너레이터와 감시가 필요한 파일을 읽기/쓰기 대기 큐에 넣고,
    • 읽기/쓰기 대기 큐: TaskManager.wait_read, TaskManager.wait_write
  3. 당장 실행 가능한(파일 입출력이 필요 없거나, 파일이 바로 읽기/쓰기가 가능한 상태가 된 제너레이터) 제너레이터들이 들어있는 큐를 모두 실행하면서 비운다.
    • 당장 실행 가능한 대기 큐: TaskManager.tasks
  4. 읽기/쓰기 대기 큐에 들어갔던 파일이 OS의 I/O Multiplexer에 의해 입출력 가능 상태가 되면, 당장 실행 가능한 대기 큐에 제너레이터를 넣어준다.
  5. 1~4를 반복한다.

장점

콜백의 못생김/위험함 문제를 모두 해결하고 동기적 코드 형태와 95% 이상 유사한 형태의 코드 작성만으로 싱글 쓰레드 동시성 구현이 가능하다!

실제 asyncio 패키지와의 비교

  • CanFuture
  • async def(제네레이터)는 Task
  • TaskManager.run()은 이벤트 루프

라고 생각할 수 있다.

김준기님 발표에 있던 보너스 숙제

  • sleep 구현해보기
  • select 대신 다른 멀티플렉서를 사용해보고 그 이유 알아보기
  • Javascript의 async, await와는 어떻게 다른지 알아보기 (특히 Promise에 집중해서)