<프로세스 vs 스레드>

프로세스
운영체제에서 할당받는 자원의 단위 / CPU 동작 시간, 공간 = 독립적
코드 영역과 데이터 영역이 독립적 -> 스택영역, 힙영역
최소 1개의 스레드 보유
파이프, 소켓, 파일 등을 사용해서 프로세스간 통신
context switching cost -> 실행시 약간의 idle 타임이 있을 수 있어 코스트가 높다 ==> 프로세스끼리의 데이터 교환

스레드
프로세스 내 실행 흐름 단위
프로세스의 자원을 사용
스택만 별도로 사용, 나머지는 공유 (데이터, 코드, 힙) => 즉 메모리 공유
한 스레드가 다른 스레드에 영향을 끼칠 수 있음 -> 고로 동기화 문제 주의

멀티스레드
한 개의 단일 어플리케이션 -> 여러 스레드로 작업후 나중에 작업처리
시스템 자원 소모 감소, 통신부담 감소 / 디버깅은 어려움 / 자원 공유 문제 (교착상태) / 프로세스에 영향

멀티프로세스
한 개의 단일 어플리케이션 -> 여러 프로세스로 구성 후 작업 처리
한 개의 프로세스 문제 발생 확산 없음
캐시 체인지 / cost 비용 높음(오버헤드) -> 복잡한 통신 방식을 사용하기 때문


<Python Gil>

키워드 :: CPython, 메모리관리, Gil 사용 이유

Gil = Global Interpreter Lock : 단일 스레드만이 파이썬 오브젝트에 접근하게 하는

python 고유의 멀티스레드 프로그램을 만 들 때 이걸 알아야함 -> 스레드를 사용할 때 제한이 있기 때문

python의 실행 원리 -> py파일을 CPython이 해석하면서 byte 코드로 바뀜

실행시 여러 스레드를 사용할 경우에는 파이썬 오브젝에 단 하나의 스레드만 접근할 수 있게 제한을 걸어둠

싱글스레드로 만들 때가 속도가 제일 빨라서 자체적으로 잠궈둠, 스레드 세이프를 위해서


<기초 스레드 : basic thread>

키워드 :: 기초 스레드 생성 예제, 메인 스레드 vs 서브(차일드) 스레드

import logging
import threading
import time

# 스레드 실행 함수
def thread_func(name):
	logging.info("Sub-Thread %s: starting", name) #출력함
    time.sleep(3)
    logging.info("Sub-Thread %s: finishing", name) #출력함

# 메인 영역 - 메인 스레드의 흐름을 타는 시작점
if __name__ == "__main__":
	#Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
    logging.info("Main-Thread: before creating thread") #출력함
    
    #함수 인자 확인 , 타겟이랑 인수 받아서 별도의 함수 실행 가능
    x = threading.Thread(target=thread_func, arg=('First',))
    
    logging.info("Main-Thread: before running thread") #출력함
    
    #서브 스레드 시작
    x.start()
    
    logging.info("Main-Thread: wait for the thread to finish") #출력함
    logging.info("Main-Thread: all done") #출력함

# join 명령어 : 자식스레드의 작업을 대기, 완료시까지 대기하는 명령어


<Daemon Thread>

키워드 :: daemon thread, join

백그라운드 실행 / 스레드 안에서 새롭게 스레드를 만들어 실행하기 때문

데몬스레드는 자기를 생성한 스레드  종료시 같이 종료 // 예시로 문서작성 자동저장

메인 스레드 : 문서 작성 영역, 데몬 스레드 : 자동저장 -> 같이 메인스레드를 보조하는 역할로 씀

일반 스레드는 작업 종료시까지 실행, 그러나 데몬 스레드는 그게 아님

import logging
import threading
import time

# 스레드 실행 함수
def thread_func(name, d):
	logging.info("Sub-Thread %s: starting", name) #출력함
    for i in d:
    print(i)
    logging.info("Sub-Thread %s: finishing", name) #출력함

# 메인 영역 - 메인 스레드의 흐름을 타는 시작점
if __name__ == "__main__":
	#Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
    logging.info("Main-Thread: before creating thread") #출력함
    
    #함수 인자 확인 , 타겟이랑 인수 받아서 별도의 함수 실행 가능
    # 디폴트는 daemon 이 false, daemon=True 옵션을 넣어줘야 스레드가 종료될 때 같이 종료
    x = threading.Thread(target=thread_func, arg=('First',range(20000)), daemon=True)
    y = threading.Thread(target=thread_func, arg=('Second',range(10000)), daemon=True)
    
    logging.info("Main-Thread: before running thread") #출력함
    
    #서브 스레드 시작
    x.start()
    y.start()
    
    # DaemonThread -> 데몬스레드인지 확인시켜주는 메소드
    print(x.isDaemon())
    
    logging.info("Main-Thread: wait for the thread to finish") #출력함
    logging.info("Main-Thread: all done") #출력함

데몬스레드에 join을 쓰면 끝까지 돌아감 -> 고로 안 쓰는 것이 좋음


<Thread Pool Executor>

키워드 :: 여러 스레드 생성할 때 편하게 할 수 있는 방법, concurrent의 futures

스레드를 많이 생성할 때, 편하게 사용할 수 있도록 Future 패키지 제공 -> Concurrent Future 패키지
해당 패키지 활용시 코드 가독성이 좋으면서 라이프 사이클도 쉽게 관리할 수 있음
-> 이걸 가능하게 해주는게 Pool Executor

그룹스레드
ㄴ python 3.2 이상 표준 라이브러리
ㄴ concurrent의 futures
ㄴ with 사용으로 생성, 소멸 라이프 사이클 관리 용이
ㄴ 디버깅하기가 난해함
ㄴ 대기중인 작업 -> Queue  -> 완료 상태 조사 -> 결과 또는 예외 -> 캡슐화(단일화)로 가져옴 

import logging
from concurrent.futures import ThreadPoolExecutor
import time

def task(name):
	logging.info('Sub-Thread %s: starting', name)
    
    result = 0
    for i in range(10001):
    	result = result + i
      
    logging.info('Sub-Thread %s: finishing result: %d', name, result)
    
    return result

def main():
	#Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
    
    logging.info('Main-Thread : before creating and running thread')
    
    #실행방법1 : max_workers : 작업 개수가 넘어가면 직접 설정이 유리
    excutor = ThreadPoolExecutor(max_workers=3)
    
    task1 = excutor.submit(task, ('First',))
    task2 = excutor.submit(task, ('Second',))
    
    print()
    print(task1.result())
    
    executor.shutdown()
    
    #실행방법2
    with ThreadPoolExecutor(max_workers=3) as excutor:
    	taske = excutor.map(task, ['First', 'Second'])
        
        #결과 확인
        print(list(task))
  
if __name__ == '__main__':
	main

<Daedlock> *중요

키워드 :: Lock, DaedLock, Race Condition, Thread synchronization 

1) 세마포어 (Semaphore) : 프로세스간 공유된 자원에 접근시 문제 발생 가능성 // 한 개의 프로세스만 접근 처리 고안
2) 뮤텍스 (Mutex) : 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는 것 // 경쟁 상태 예방
3) Lock : 상호 배제를 위한 잠금 처리
4) 데드락 : 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상황 (교착상태)
5) Thread Synchronization(스레드 동기화) 를 통해서 안정적으로 동작하게 처리 (동기화 메소드, 동기화 블록)
6) 세마포어와 뮤텍스의 차이
ㄴ 모두 병렬 프로그래밍 환경에서 상호배제를 위해 사용
ㄴ 뮤텍스 개체는 단일 스레드가 리소스 또는 중요 섹션을 소비 허용
ㄴ 세마포어는 리소스에 대한 제한된 수의 동시 액세스를 허용

# 동기화가 안 되는 예제

import logging
from concurrent.futures import ThreadPoolExecutor
import time

#동시에 여러 스레드가 접근해서 어떤 작업을 하는 오브젝트
class FakeDataStore:
	#공유 변수(value)
    def __init__(self):
    	self.valud = 0
        
    #변수 업데이트 함수 - 이 함수 실행을 위해서는 stack 영역 필요
    #이유 : 함수를 호출하고 시작점이나 돌아갈 주소값을 알아야하고, 인자 값들을 갖고 가야하기 때문
    def update(self, n):
    	logging.info('Thread %s: starting update', n)
        
        #뮤텍스 & Lock등 동기화 (Thread synchronization 필요)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        
        logging.info('Thread %s: finshing update', n)

if __name__ == "__main__":
	#Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
    
    #클래스 인스턴스화
    store = FakeDataStore()
    
    logging.info('Testing update. Starting value is %d', store.value)
    
    #with Context 시작
    with ThreadPoolExecutor(max_wokers=2) as executor:
    	for n in ['First', 'Second', 'Third']:
        	executor.submit(store.update, n)
    logging.info('Testing update. Starting value is %d', store.value)

stack 영역 -> 스레드별로 함수를 호출할 경우, 함수 내에서 나만의 변수를 선언해서 계산을 해야할 경우 필요함
그래서 각각 가지고 있음

# 키를 주는 동기화 방법

import logging
from concurrent.futures import ThreadPoolExecutor
import time
import threading

#동시에 여러 스레드가 접근해서 어떤 작업을 하는 오브젝트
class FakeDataStore:
	#공유 변수(value)
    def __init__(self):
    	self.valud = 0
        self._lock = threading.Lock()
        
    #변수 업데이트 함수 - 이 함수 실행을 위해서는 stack 영역 필요
    #이유 : 함수를 호출하고 시작점이나 돌아갈 주소값을 알아야하고, 인자 값들을 갖고 가야하기 때문
    def update(self, n):
    	logging.info('Thread %s: starting update', n)
        
        #뮤텍스 & Lock등 동기화 (Thread synchronization 필요)
        
        # Lock 획득 방법1
        #self._lock.acquire()
        #logging.info('Thread %s has Lock', n)
        
        #local_copy = self.value
        #local_copy += 1
        #time.sleep(0.1)
        #self.value = local_copy
        
        #logging.info('Thread %s about to release lock', n)
        
        #Lock 반환
        #self._lock.release()
        
        # Lock 획득 방법 2 - acquire, release 를 알아서 해줌
        with self._lock:
        	logging.info('Thread %s has Lock', n)
            
        	local_copy = self.value
        	local_copy += 1
        	time.sleep(0.1)
        	self.value = local_copy
            
            logging.info('Thread %s about to release lock', n)
        
        logging.info('Thread %s: finshing update', n)

if __name__ == "__main__":
	#Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
    
    #클래스 인스턴스화
    store = FakeDataStore()
    
    logging.info('Testing update. Starting value is %d', store.value)
    
    #with Context 시작
    with ThreadPoolExecutor(max_wokers=2) as executor:
    	for n in ['First', 'Second', 'Third']:
        	executor.submit(store.update, n)
    logging.info('Testing update. Starting value is %d', store.value)

<Prod and Cons>

키워드 :: 생산자 소비자 패턴

Prod, Cons = 생산자, 소비자

생산자 소비자 패턴
1. 멀티스레드 디자인 패턴의 정석
2. 서버측 프로그래밍의 핵심
3. 주로 허리역할

Python 이벤트 객체
1. Flag 초기값 (0)
2. .Set() -> 1, Clear -> 0, Wait(1 -> 리턴, 0 -> 대기), isSet() -> 현 플래그 상태

import concurrent.futures
import logging
import queue
import random
import threading
import time

#생산자
def producer():
	#네트워크 대기 상태라 가정(서버)
    while not event.is_set():
    	message = random.randint(1, 11)
    	logging.info9('Producer got message: %s', message)
        queue.put(message)
     logging.info('Producer recived event Exiting')

#소비자
def consumer():
	#응답받고 소비하는 것으로 가정 or DB 저장
    while not event.is_set() or not queue.empty():
    	message = queue.get()
        logging.info(
        	'Consumer storing message: %s (size=%d)', message, queue.qsize()
        )
     logging.info('Producer recived event Exiting')

if __name__ == "__main__":
	#Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
    
    #사이즈 중요
    pipeliine = queue.Queue(maxsize=10)
    
    #이벤트 플래그 초기값 0
    event = threading.Event()
    
    #with Context 시작
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    	executor.submit(producer, pipeline, event)
        executor.submit(producer, pipeline, event)
    
    	#실행시간 조정
    	time.sleep(0.1)
    
    	logging.info('Main : about to set event')
    
    	#프로그램 종료
    	event.set()

 

복사했습니다!