<동시성 병렬성>

키워드 :: Concurrency (동시성)

동시성
1. CPU 가용성 극대화 위해 병렬성의 단점 및 어려움을 소프트웨어 레벨에서 해결하기 위한 방법
2. 싱글코어에 멀티스레드 패턴으로 작업 처리
3. 동시 작업에 있어 일정량 처리 후 다음 작업
4. 제어권 주고 받으며 처리, 병렬적은 아님

 

동시성
논리적, 동시 실행 패턴, 싱글코어, 멀티코어에서 실행 가능, 한 개의 작업 공유 처리,  디버깅 매우 어려움, Mutex, deadlock

병렬성
물리적, 물리적으로 동시싫생, 멀티코어에서 구현 가능, 주로 별개의 작업 처리, 디버깅 어려움, OpenMp, MPI, CUDA


<Blocking vs Non-Blocking IO>

키워드 :: Blocking IO, Non-Blocking IO, Sync, Aync

Blocking IO
1. 시스템 콜 요청시 -> 커널 IO 작업 완료시까지 응답 대기
2. 제어권(IO작업) -> 커널 소유 -> 응답(Response) 전까지 대기 (Block) -> 다른 작업 수행 불가

Non-Blocking IO
1. 시스템 콜 요청시 -> 커널 IO 작업 완료 여부 상관없이 즉시 응답
2. 제어권(IO작업) -> 유저 프로세스 -> 다른 작업 수행 가능(지속) -> 주기적으로 시스템 콜 : IO 작업 완료 여부 확인

Async
1. IO 작업 완료 여부에 대한 Noty는 커널(호출되는 함수) -> 유저 프로세스(호출하는 함수)

Sync
1. IO 작업 완료 여부에 대한 Noty는 유저 프로세스(호출하는 함수) -> 커널(호출되는 함수)


 

<멀티프로세싱 vs 스레딩 vs AsyncIO>

키워드 :: CPU Bound, I/O Bound, AsyncIO

CPU Bound
1. 프로세스 진행 -> CPI 속도에 의해 제한(결정) -> 행렬 곱, 고속 연산, 압축 파일, 집합 연산 등
2. CPU 연산 위주 작업

I/O Bound
1. 파일쓰기, 디스크 작업, 네트워크 통신, 시리얼 포트 송수신 -> 작업에 의해서 병목이 결정
2. CPU성능 지표가 수행시간 단축으로 크게 영향을 끼치지 않음

메모리 바인딩, 캐시 바운딩

작업 목적에 따라서 적절한 동시성 라이브러리 선택이 중요

멀티프로세싱
- 멀티프로세스
- 고가용성(CPU)
- 10개 부엌, 10명 요리사, 10개 요리

스레딩
- 싱글(멀티)프로세스
- 멀티 스레드
- 1개 부엌, 10명 요리사, 10개 요리

AsyncIO
- 싱글 프로세스
- 싱글 스레드
- 1개 부엌, 1명 요리사, 10개 요리


<I/O Bound> - Synchronous

키워드 :: I/O Bound, requests

requests-> 없을 수 있으나 없으면 에러, 혹은 no-module-name-request // 빌트인 패키지가 아니라 외부적으로 설치해야함
어떤 서버에 접속해서 관련 정보를 가져와 처리하는 편의성 제공 -- 크롤링 할 때 많이 

$ cd \python_ex/Scripts
$ activate
$ pip install requests
$ pip list
import requests
import time

# 실행함수 (다운로드)
def request_site(url, session):
    # 세션 확인
    print(session)
    print(session.headers)
    
    with session.get(url) as response:
        print(f'[Read Contents : {len(response.content)}, Status Code : {response.status_code}] from {url}')

# 실행함수 (요청)
def request_all_sites(urls):
    with requests.Session() as session:
        for url in urls:
            request_site(url, session)

def main():
    #테스트 URL
    urls = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
        "https:realpython.com"
    ] * 3
    
    # 실행 시간 측정
    start_time = time.time()
    
    # 실행
    request_all_sites(urls)
    
    # 실행 시간 종료
    duration = time.time() - start_time
    
    print()
    
    # 결과 출력
    print(f'Download {len(urls)} sites in {duration} seconds')

if __name__ == "__main__":
    main()

<I/O Bound> - threading vs asyncio vs multiprocessing

# threading 예제

import concurrent.futures
import threading
# pip install requests
import requests
import time

# 각 스레드에 생성되는 객체(독립적)
thread_local = threading.local()

# 세션 제공
def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

# 실행함수1(다운로드)
def request_site(url):
    # 세션 획득
    session = get_session()

    # 세션 확인
    print(session)
    # print(session.headers)

    with session.get(url) as response:
        print(f"[Read Contents : {len(response.content)}, Status Code : {response.status_code}] from {url}")
        

# 실행함수2
def request_all_site(urls):
    # 멀티스레드 실행
    # 반드시 max_worker 개수 조절 후 session 객체 확인
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        executor.map(request_site, urls)

def main():
    # 테스트 URLS
    urls = [
            "https://www.jython.org",
            "http://olympus.realpython.org/dice",
            "https://realpython.com/"
    ] * 3
    
    # 실행시간 측정
    start_time = time.time()

    # 실행
    request_all_site(urls)

    # 실행 시간 종료
    duration = time.time() - start_time

    print()
    
    # 결과 출력
    print(f"Downloaded {len(urls)} sites in {duration} seconds")

if __name__ == "__main__":
    main()
# MultiProcessing 예제

import multiprocessing
# pip install requests
import requests
import time

# 각 프로세스 메모리 영역에 생성되는 객체(독립적)
# 함수 실행 할 때 마다 객체 생성은 좋지 않음. -> 각 프로세스마다 할당
session = None

# 세션 제공
def set_global_session():
    global session
    if not session:
        session = requests.Session()

# 실행함수1(다운로드)
def request_site(url):
    # 세션 확인
    # print(session)
    # print(session.headers)

    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"[{name} -> Read Contents : {len(response.content)}, Status Code : {response.status_code}] from {url}")
        

# 실행함수2
def request_all_site(urls):
    # 멀티프로세싱 실행
    # 반드시 processes 개수 조절 후 session 객체 및 실행 시간 확인
    # 생략시 자동으로 설정
    with multiprocessing.Pool(initializer=set_global_session, processes=4) as pool:
        # pool 병렬화(함수 실행)
        pool.map(request_site, urls)

def main():
    # 테스트 URLS
    urls = [
            "https://www.jython.org",
            "http://olympus.realpython.org/dice",
            "https://realpython.com/"
    ] * 3
    
    # 실행시간 측정
    start_time = time.time()

    # 실행
    request_all_site(urls)

    # 실행 시간 종료
    duration = time.time() - start_time

    print()
    
    # 결과 출력
    print(f"Downloaded {len(urls)} sites in {duration} seconds")

if __name__ == "__main__":
    main()

 

<CPU Bound> - Synchronous

키워드 :: CPU Bound

CPU Bound - CPU만을 사용해서 작업하는거

import time

# 실행함수1(계산)
def cpu_bound(number):
    return sum(i * i for i in range(number))

# 실행함수2
def find_sums(numbers):
    result = []
    for number in numbers:
        result.append(cpu_bound(number))

    return result

def main():
    numbers = [3_000_000 + x for x in range(30)]

    # 확인
    # print(numbers)
   
    # 실행시간 측정
    start_time = time.time()

    # 실행
    total = find_sums(numbers)

    print()
    
    # 결과 출력
    print(f"Total list : {total}")
    print(f"Sum : {sum(total)}")

    # 실행 시간 종료
    duration = time.time() - start_time

    print()

    # 수행 시간
    print(f"Duration : {duration} seconds")

if __name__ == "__main__":
    main()

<CPU Bound> - Multiprocessing

from multiprocessing import current_process, Array, freeze_support, Process, Manager
import time
import os


# 실행함수(계산)
def cpu_bound(number, total_list):

    process_id = os.getpid()
    process_name = current_process().name

    # Process 정보 출력
    print(f"Process ID: {process_id}, Process Name: {process_name}")

    total_list.append(sum(i * i for i in range(number)))


def main():
    # 계산 값
    numbers = [3_000_000 + x for x in range(30)]

    # 확인
    # print(numbers)

    # 프로세스 리스트  선언
    processes = list()

    # 프로세스 공유 매니저
    manager = Manager()

    # 리스트 획득(프로세스 공유)
    total_list = manager.list()

    # 실행시간 측정
    start_time = time.time()

    # 프로세스 생성 및 실행
    for i in numbers: # 1 ~ 100 적절히 조절
        # 생성
        t = Process(name=str(i), target=cpu_bound, args=(i,total_list,))

        # 배열에 담기
        processes.append(t)

        # 시작
        t.start()

    # Join
    for process in processes:
        process.join()

    print()
    
    # 결과 출력
    print(f"Total list : {total_list}")
    print(f"Sum : {sum(total_list)}")

    # 실행 시간 종료
    duration = time.time() - start_time

    print()

    # 수행 시간
    print(f"Duration : {duration} seconds")

if __name__ == "__main__":
    # 윈도우 예외시 
    # freeze_support()
    
    # 메인 함수 실행
    main()
복사했습니다!