본문 바로가기
Development/Python

Python Concurrency Programming

by IMCOMKING 2020. 1. 9.

Multi-thread vs Multi-Processing vs AsyncIO

 

Python의 default interpreter인 CPython은 GIL(Global Interpreter Lock)이 걸려 있어서 multi-thread를 구현해도, 여러 thread가 정말로 동시에 실행될 수는 없다. 그래서 대개의 경우 multi-processing을 통해서 병렬성(parallelism)를 구현한다. 그러나 multi-processing은 "almost independent"하게 job을 쪼갤 수 있는 CPU 연산이 heavy한 task에 효과적으로 사용할 수가 있다. 그러나 몇몇 상황에서는 이렇게 효과적으로 job을 나누기 힘든 경우가 존재하기 때문에 multi-processing이 모든 문제를 해결해 줄 수는 없다. 

 

반면에 asyncio의 경우 coroutine과 future라는 개념에 의해 효과적으로 IO가 오래 걸리는 함수를 non-blocking으로 구현하여, 보다 쉽게 동시성을 구현할 수 있다. 기존의 경우 IO가 bottleneck인 task를 multi-thread로 해결했었으므로, 즉 asyncio는 multi-thread를 대체한다고 거칠게 말할 수 있다. 그러나 asyncio는 single thread, single process로 동작하기 때문에 연산량 자체가 heavy한 task에서는 이 또한 적절한 해결책이 아닐 수 있다.

 

즉, 궁극적으로는 asyncio와 multi-processing을 함께 사용하여, IO bound와 CPU연산을 동시에 해결할 수 있다.

 

 

Concurrency Programming

Python에서 asyncio를 이용한 concurrency에 대해 알아본다. 이 글은 아래의 문서를 참고하여 내용을 작성하였다.

 

 

Parallelism은 concurrency의 하위 개념이다. 즉 동시성(concurrency)을 구현하는 방법 중 하나가 병렬화(paralleism)라고 얘기할 수 있을 것이다. 이때 threading은 concurrency와 좀 더 가까운 개념이다.

Concurrency의 핵심은 single cpu에서 동작하는 듯한 절차적인 코딩을 통해서도 동시성을 구현할 수 있다는 것이다. 즉 짜기가 쉽다. 그리고 여기에는 coroutine이라는 핵심 개념이 등장한다.

이러한 concurrency를 설명하는 좋은 비유는 바로 1:24의 다면 체스 게임이다. 한 명의 체스 고수가 24명의 플레이어와 체스를 둔다. 그런데 이때 체스 고수는 5초만에 1수를 두고, 일반 플레이어는 55초에 1 수를 둔다. 그러면 어떤식으로 해야 빠르게 24명을 상대할 수 있을까?

당연히 체스 고수가 5초만에 수를 둔다음, 상대방이 무슨 수를 두는지 기다리지 않고 바로 다음 상대를 찾아가서 5초만에 또 수를 두는 것이다. 이런 방식이 바로 concurrent programming이다. 즉 여러 개의 task를 동시에 수행하고자 할 때, 한 task가 끝나기를 기다리지 않고, 다음 task를 계속해서 처리하는 것이다.

이러한 구현은 multi-thread를 통해서도 할 수도 있지만, 멀티쓰레딩은 구현 난이도가 아주 아주 높은 방식이라 굉장한 버그와 난해함이 존재한다. 반면에 asyncio를 이용하면 첫 구현은 조금 어렵지만, 익숙해질수록 훨씬 직관적이고 더 적은 버그로 구현할 수 있다.

이제 python에서 asyncio를 구현하기 위해 필요한 3가지를 알아볼 것이다. 

 

1. coroutine: non-blocking function 

2. async: coroutine을 구현할 때 

3. await: coroutine을 활용할 때

 

 

구체적으로 들어가기 전에 다음의 예제를 살펴보자.

import asyncio
async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

import time
s = time.perf_counter()
await main()
elapsed = time.perf_counter() - s
print(elapsed)

위 코드를 jupyter notebook에서 실행해보면, One이 연달아 세번 출력되고, 1초후 Two가 연달아 세번 출력된다. 즉 count()라는 함수가 미처 다 종료되기 전에 다음 count()가 실행되는 것이다.

def count():
    print("One")
    time.sleep(1)
    print("Two")

def main():
    [count(), count(), count()]

import time
s = time.perf_counter()
main()
elapsed = time.perf_counter() - s
print(elapsed)

그러나 asyncio를 사용하지 않은 위의 일반적인 코드는 One, Two One, Two, One, Two 이런식으로 3초에 걸쳐 번갈아가면서 결과가 출력된다. 즉 count()라는 함수가 순서대로 실행이 되는 것이다.

Asyncio를 사용하는 것이 아까 말했던 체스의 예시와 동일한 상황이라고 보면 된다.

여기서의 핵심은 바로 await asyncio.sleep(1) 이 non-blocking 함수라는 것이다. 반대로 time.sleep(1)은 blocking 함수이다. 만약 asyncio 버전 코드에서 await asyncio.sleep(1) 대신에 그냥 time.sleep(1)을 사용하면, 그 결과는 아래의 절차적인 코드와 동일하게 실행된다.

즉 쉽게 말하자면, asyncio란 이런 non-blocking(coroutine)함수를 지원하기 위한 interface라고 요약할 수 있다. 그러면 이제 다음의 두 키워드를 이해보자.

async 키워드

이 키워드는 방금 말한 coroutine 함수를 정의할 때 쓰이는 키워드이다. 그래서 항상 async def 함수명() 이런식으로 사용된다.

await 키워드

coroutine 함수를 실행하기 위해서는 반드시 await / asyncio.gather / asyncio.run 셋중 한가지가 필요하다. 그런데 gather와 run은 coroutine을 호출하고 결과물을 가져오는 목적으로 필요하고, await은 coroutine 함수를 활용한 또 다른 coroutine함수를 구현하는 데에 쓰인다.

(참고로 asyncio.sleep(1) 함수 역시 coroutine이다. 이 함수를 await 키워드 없이 그냥 호출해봤자 해당하는 coroutine object만이 return 된다.)

이 키워드로 호출된 coroutine 함수는 일단 먼저 작업을 시작은 하지만, 해당 함수가 완료될 때까지 기다리지 않고, control point를 그 전 단계로 되돌려주어 다음 작업으로 넘어 가도록 해준다. 이후 해당 작업이 완료가 되면 다시 그 control point로 되돌아가서, 다음 코드를 실행하게 된다.

보다 정확하게 말하자면 await은 function의 control을 다시 원래의 event loop로 되돌려주는 역할이다. 즉 await은 event를 실행시키고, control을 돌려준 뒤, 해당 event가 완료될 때 반응하는 callback method의 기능을 동시에 한다고 보면 된다.

 

 

 

Async IO 디자인 패턴

asyncio를 구현하는 경우, 하나의 기능을 여러 겹의 작은 coroutine으로 쪼개고, wrapper를 만들어서 구현하는 경우가 많다. 이를 이용해 함수의 composablity(조합성)를 높일 수 있다.

- native coroutine 패턴 VS generator-based coroutine 패턴

yield는 생산하다라는 의미로, python의 generator를 구현할 때 사용되는 키워드이다. 그런데 coroutine함수를 구현할 때 return 대신, 이 yield 를 이용해서 구현할 수도 있다. 이를 generator-based coroutine이라고 부르며, async 키워드 대신 @asyncio.coroutine과 결합하여 사용된다. 그러나 이 generator-based coroutine은 python 3.10에서 삭제 될 예정이므로 가능한 async/await을 이용한 native coroutine을 이용하도록 하자.

 

 

 

asyncio 패키지

loop.run_in_executor

제대로 asyncio를 구현하려면 시간이 오래걸리는 작업이 반드시 coroutine으로 구현되어야한다. 그래서 기존에 구현된 일반적인 함수를 coroutine으로 바꾸기 위해서는 다음과 같은 함수가 필요하다.

async def coroutine_func(func):
    loop = asyncio.get_event_loop()
    return  await loop.run_in_executor(None, lambda: func())

 

asyncio.gather(coroutines)

전달 받은 여러 개의 coroutines들을 동시에 실행하고, 그 coroutines들에 대한 future를 return한다. 이 후 future.result() 함수를 호출하여 return 된 값을 가져올 수 있다.

Future란 coroutine 함수 실행하고, 해당 함수가 완료되기 전의 상태를 위해서 생긴 개념이다. 미래에 함수가 완료될 때, 그 결과물을 가져오기 위해서 사용되는 것을 future라고 부른다. Future에는 함수가 실행되고나서 완료가 되는 미래 시점에서야 값이 채워지고, 그 전에는 아무 값도 가지고 있지 않다.

asyncio.run(coroutine)

이 함수는 python 3.7에서 추가된 기능으로, 전달 받은 coroutine이 모두 최종 완료될 때까지 무한 loop를 돌면서 실행 결과를 리턴해주는 blocking 함수이다. 아래의 두 코드는 서로 완전히 동일한 기능을 하며, 대부분의 경우 run함수 하나로 처리할 수 있다.

asyncio.run(main())  # Python 3.7+
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

그러나 이런 무한 loop를 아래와 같이 일반적인 while함수로 구현해버리면, control을 coroutine에게 다시 돌려주는 것이 불가능하여 무한루프에 빠져버린다. 따라서 반드시 asyncio 내의 함수를 사용해서 loop를 구현해야한다.

while not future.done():
   pass
future.result()

만약 loop를 직접 마이크로 컨트롤 하고 싶은 경우에는 아래의 코드에서 loop.is_running()함수나 loop.is_closed()함수 등을 이용해 세세한 구현을 할 수도 있다. 그러나 보통의 경우 위의 run()함수로 충분하다.

 

 

 

RuntimeError: This event loop is already running

특히 jupyter를 사용하면서 asyncio.run()함수를 사용하면 자주 발생한다. 쉬운 해결 방법은 다음과 같이 nest_async를 사용하는 것이다.

 

pip install nest_asyncio

import nest_asyncio
nest_asyncio.apply()

 

이후에는 jupyter에서도 asyncio.run() 을 정상적으로 호출할 수 있다.

 

 

 

 

 

 

 

 

 

 

댓글