<프로세스 vs 스레드>
프로세스
운영체제에서 할당받는 자원의 단위 / CPU 동작 시간, 공간 = 독립적
코드 영역과 데이터 영역이 독립적 -> 스택영역, 힙영역
최소 1개의 스레드 보유
파이프, 소켓, 파일 등을 사용해서 프로세스간 통신
context switching cost -> 실행시 약간의 idle 타임이 있을 수 있어 코스트가 높다 ==> 프로세스끼리의 데이터 교환
스레드
프로세스 내 실행 흐름 단위
프로세스의 자원을 사용
스택만 별도로 사용, 나머지는 공유 (데이터, 코드, 힙) => 즉 메모리 공유
한 스레드가 다른 스레드에 영향을 끼칠 수 있음 -> 고로 동기화 문제 주의
멀티스레드
한 개의 단일 어플리케이션 -> 여러 스레드로 작업후 나중에 작업처리
시스템 자원 소모 감소, 통신부담 감소 / 디버깅은 어려움 / 자원 공유 문제 (교착상태) / 프로세스에 영향
멀티프로세스
한 개의 단일 어플리케이션 -> 여러 프로세스로 구성 후 작업 처리
한 개의 프로세스 문제 발생 확산 없음
캐시 체인지 / cost 비용 높음(오버헤드) -> 복잡한 통신 방식을 사용하기 때문
<Python Gil>
키워드 :: CPython, 메모리관리, Gil 사용 이유
Gil = Global Interpreter Lock : 단일 스레드만이 파이썬 오브젝트에 접근하게 하는
python 고유의 멀티스레드 프로그램을 만 들 때 이걸 알아야함 -> 스레드를 사용할 때 제한이 있기 때문
python의 실행 원리 -> py파일을 CPython이 해석하면서 byte 코드로 바뀜
실행시 여러 스레드를 사용할 경우에는 파이썬 오브젝에 단 하나의 스레드만 접근할 수 있게 제한을 걸어둠
싱글스레드로 만들 때가 속도가 제일 빨라서 자체적으로 잠궈둠, 스레드 세이프를 위해서
<기초 스레드 : basic thread>
키워드 :: 기초 스레드 생성 예제, 메인 스레드 vs 서브(차일드) 스레드
import logging
import threading
import time
# 스레드 실행 함수
def thread_func(name):
logging.info("Sub-Thread %s: starting", name) #출력함
time.sleep(3)
logging.info("Sub-Thread %s: finishing", name) #출력함
# 메인 영역 - 메인 스레드의 흐름을 타는 시작점
if __name__ == "__main__":
#Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
logging.info("Main-Thread: before creating thread") #출력함
#함수 인자 확인 , 타겟이랑 인수 받아서 별도의 함수 실행 가능
x = threading.Thread(target=thread_func, arg=('First',))
logging.info("Main-Thread: before running thread") #출력함
#서브 스레드 시작
x.start()
logging.info("Main-Thread: wait for the thread to finish") #출력함
logging.info("Main-Thread: all done") #출력함
# join 명령어 : 자식스레드의 작업을 대기, 완료시까지 대기하는 명령어
<Daemon Thread>
키워드 :: daemon thread, join
백그라운드 실행 / 스레드 안에서 새롭게 스레드를 만들어 실행하기 때문
데몬스레드는 자기를 생성한 스레드 종료시 같이 종료 // 예시로 문서작성 자동저장
메인 스레드 : 문서 작성 영역, 데몬 스레드 : 자동저장 -> 같이 메인스레드를 보조하는 역할로 씀
일반 스레드는 작업 종료시까지 실행, 그러나 데몬 스레드는 그게 아님
import logging
import threading
import time
# 스레드 실행 함수
def thread_func(name, d):
logging.info("Sub-Thread %s: starting", name) #출력함
for i in d:
print(i)
logging.info("Sub-Thread %s: finishing", name) #출력함
# 메인 영역 - 메인 스레드의 흐름을 타는 시작점
if __name__ == "__main__":
#Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
logging.info("Main-Thread: before creating thread") #출력함
#함수 인자 확인 , 타겟이랑 인수 받아서 별도의 함수 실행 가능
# 디폴트는 daemon 이 false, daemon=True 옵션을 넣어줘야 스레드가 종료될 때 같이 종료
x = threading.Thread(target=thread_func, arg=('First',range(20000)), daemon=True)
y = threading.Thread(target=thread_func, arg=('Second',range(10000)), daemon=True)
logging.info("Main-Thread: before running thread") #출력함
#서브 스레드 시작
x.start()
y.start()
# DaemonThread -> 데몬스레드인지 확인시켜주는 메소드
print(x.isDaemon())
logging.info("Main-Thread: wait for the thread to finish") #출력함
logging.info("Main-Thread: all done") #출력함
데몬스레드에 join을 쓰면 끝까지 돌아감 -> 고로 안 쓰는 것이 좋음
<Thread Pool Executor>
키워드 :: 여러 스레드 생성할 때 편하게 할 수 있는 방법, concurrent의 futures
스레드를 많이 생성할 때, 편하게 사용할 수 있도록 Future 패키지 제공 -> Concurrent Future 패키지
해당 패키지 활용시 코드 가독성이 좋으면서 라이프 사이클도 쉽게 관리할 수 있음
-> 이걸 가능하게 해주는게 Pool Executor
그룹스레드
ㄴ python 3.2 이상 표준 라이브러리
ㄴ concurrent의 futures
ㄴ with 사용으로 생성, 소멸 라이프 사이클 관리 용이
ㄴ 디버깅하기가 난해함
ㄴ 대기중인 작업 -> Queue -> 완료 상태 조사 -> 결과 또는 예외 -> 캡슐화(단일화)로 가져옴
import logging
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
logging.info('Sub-Thread %s: starting', name)
result = 0
for i in range(10001):
result = result + i
logging.info('Sub-Thread %s: finishing result: %d', name, result)
return result
def main():
#Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
logging.info('Main-Thread : before creating and running thread')
#실행방법1 : max_workers : 작업 개수가 넘어가면 직접 설정이 유리
excutor = ThreadPoolExecutor(max_workers=3)
task1 = excutor.submit(task, ('First',))
task2 = excutor.submit(task, ('Second',))
print()
print(task1.result())
executor.shutdown()
#실행방법2
with ThreadPoolExecutor(max_workers=3) as excutor:
taske = excutor.map(task, ['First', 'Second'])
#결과 확인
print(list(task))
if __name__ == '__main__':
main
<Daedlock> *중요
키워드 :: Lock, DaedLock, Race Condition, Thread synchronization
1) 세마포어 (Semaphore) : 프로세스간 공유된 자원에 접근시 문제 발생 가능성 // 한 개의 프로세스만 접근 처리 고안
2) 뮤텍스 (Mutex) : 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는 것 // 경쟁 상태 예방
3) Lock : 상호 배제를 위한 잠금 처리
4) 데드락 : 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상황 (교착상태)
5) Thread Synchronization(스레드 동기화) 를 통해서 안정적으로 동작하게 처리 (동기화 메소드, 동기화 블록)
6) 세마포어와 뮤텍스의 차이
ㄴ 모두 병렬 프로그래밍 환경에서 상호배제를 위해 사용
ㄴ 뮤텍스 개체는 단일 스레드가 리소스 또는 중요 섹션을 소비 허용
ㄴ 세마포어는 리소스에 대한 제한된 수의 동시 액세스를 허용
# 동기화가 안 되는 예제
import logging
from concurrent.futures import ThreadPoolExecutor
import time
#동시에 여러 스레드가 접근해서 어떤 작업을 하는 오브젝트
class FakeDataStore:
#공유 변수(value)
def __init__(self):
self.valud = 0
#변수 업데이트 함수 - 이 함수 실행을 위해서는 stack 영역 필요
#이유 : 함수를 호출하고 시작점이나 돌아갈 주소값을 알아야하고, 인자 값들을 갖고 가야하기 때문
def update(self, n):
logging.info('Thread %s: starting update', n)
#뮤텍스 & Lock등 동기화 (Thread synchronization 필요)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info('Thread %s: finshing update', n)
if __name__ == "__main__":
#Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
#클래스 인스턴스화
store = FakeDataStore()
logging.info('Testing update. Starting value is %d', store.value)
#with Context 시작
with ThreadPoolExecutor(max_wokers=2) as executor:
for n in ['First', 'Second', 'Third']:
executor.submit(store.update, n)
logging.info('Testing update. Starting value is %d', store.value)
stack 영역 -> 스레드별로 함수를 호출할 경우, 함수 내에서 나만의 변수를 선언해서 계산을 해야할 경우 필요함
그래서 각각 가지고 있음
# 키를 주는 동기화 방법
import logging
from concurrent.futures import ThreadPoolExecutor
import time
import threading
#동시에 여러 스레드가 접근해서 어떤 작업을 하는 오브젝트
class FakeDataStore:
#공유 변수(value)
def __init__(self):
self.valud = 0
self._lock = threading.Lock()
#변수 업데이트 함수 - 이 함수 실행을 위해서는 stack 영역 필요
#이유 : 함수를 호출하고 시작점이나 돌아갈 주소값을 알아야하고, 인자 값들을 갖고 가야하기 때문
def update(self, n):
logging.info('Thread %s: starting update', n)
#뮤텍스 & Lock등 동기화 (Thread synchronization 필요)
# Lock 획득 방법1
#self._lock.acquire()
#logging.info('Thread %s has Lock', n)
#local_copy = self.value
#local_copy += 1
#time.sleep(0.1)
#self.value = local_copy
#logging.info('Thread %s about to release lock', n)
#Lock 반환
#self._lock.release()
# Lock 획득 방법 2 - acquire, release 를 알아서 해줌
with self._lock:
logging.info('Thread %s has Lock', n)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info('Thread %s about to release lock', n)
logging.info('Thread %s: finshing update', n)
if __name__ == "__main__":
#Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
#클래스 인스턴스화
store = FakeDataStore()
logging.info('Testing update. Starting value is %d', store.value)
#with Context 시작
with ThreadPoolExecutor(max_wokers=2) as executor:
for n in ['First', 'Second', 'Third']:
executor.submit(store.update, n)
logging.info('Testing update. Starting value is %d', store.value)
<Prod and Cons>
키워드 :: 생산자 소비자 패턴
Prod, Cons = 생산자, 소비자
생산자 소비자 패턴
1. 멀티스레드 디자인 패턴의 정석
2. 서버측 프로그래밍의 핵심
3. 주로 허리역할
Python 이벤트 객체
1. Flag 초기값 (0)
2. .Set() -> 1, Clear -> 0, Wait(1 -> 리턴, 0 -> 대기), isSet() -> 현 플래그 상태
import concurrent.futures
import logging
import queue
import random
import threading
import time
#생산자
def producer():
#네트워크 대기 상태라 가정(서버)
while not event.is_set():
message = random.randint(1, 11)
logging.info9('Producer got message: %s', message)
queue.put(message)
logging.info('Producer recived event Exiting')
#소비자
def consumer():
#응답받고 소비하는 것으로 가정 or DB 저장
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
'Consumer storing message: %s (size=%d)', message, queue.qsize()
)
logging.info('Producer recived event Exiting')
if __name__ == "__main__":
#Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
#사이즈 중요
pipeliine = queue.Queue(maxsize=10)
#이벤트 플래그 초기값 0
event = threading.Event()
#with Context 시작
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(producer, pipeline, event)
#실행시간 조정
time.sleep(0.1)
logging.info('Main : about to set event')
#프로그램 종료
event.set()
'▶ InfoSecurity > 병렬프로그래밍' 카테고리의 다른 글
섹션 3. Concurrency, CPU Bound vs I/O Bound (2) | 2023.12.18 |
---|---|
섹션 2. 병렬성과 멀티프로세싱 (1) | 2023.12.18 |