<프로세스 vs 스레드, 병렬성>
키워드 :: 프로세스, 스레드, 병렬성
스레드 -> 데이터를 공유하던 방식
프로세서 -> 또 다른 패키지를 지원받아서 공유
멀티프로세싱은 오버헤드가 크기 때문에 운영체제 지식 필요
Parallelism = 병렬성
1. 완전히 동일한 타이밍에 태스크 실행
2. 다양한 파트로 나눠서 실행
3. 멀티프로세싱에서 CPU가 1코어인경우 만족하지않음
4. 딥러닝, 비트코인 채굴 등
프로세스 | 스레드 |
독립된 메모리 | 공유 메모리 |
많은 메모리 필요 | 적은 메모리 |
좀비(데드)프로세스 생성 가능성 | 좀비(데트)스레드 생성 쉽지 않음 |
오버헤드 큼 | 오버헤드 작음 |
생성/소멸 다소 느림 | 생성/소멸 빠름 |
코드 작성 쉬움 / 디버깅 어려움 | 코드 작성 어려움 / 디버깅 어려 |
스레드는 공유 / 프로세스는 공유 X
프로세스 --> 각자 처리하고 마지막에 합치는 작업
<Join, is_alive>
키워드 :: 멀티프로세싱, 프로세싱 스테이트(processing state)
하나의 프로세스 생성 및 단일 실행
from multiprocessing import Process
import time
import logging
def proc_func(name):
print('Sub-Process {}: starting'.format(name))
time.sleep(3)
print('Sub-Process {}: finishing'.format(name))
def main():
#Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
#함수 인자 확인
p = Process(target=proc_func, arg=('First',))
logging.info('Main-Process: before creating Process')
#프로세스
p.start()
logging.info('Main-Process: During Process')
#강제적으로 kill - 프로세스 시간이 오래걸릴 때 : 잘 사용 안 하긴 함
logging.info('Main-Process: Terminated Process')
p.terminate()
logging.info('Main-Process: Joined Process')
p.join()
#프로세스 상태 확인
print(f'Process p is alive: {p.is_alive()}')
#Main
if __name__ == "__main__":
main()
<Naming, Parallel>
키워드 :: Naming, parallel processing
from multiprocessing import Process, current_process
import os
import random
import time
#실행
def square(n):
time.sleep(random.randint(1,3))
process_id = os.getpid()
process_name = current_process().name
#제곱
result = n * n
#정보 출력
print(f'Process ID : {process_id}, Process Name : {process_name}')
print(f'Result of {n} square: {result}')
#메인
if __name__ == '__main__':
#부모 프로세스 아이디
parent_process_id = os.getpgid()
#출력
print(f'Parent process ID {parent_process_id}')
#프로세스 리스트 선언
processes = list()
#프로세스 생성 및 실행
for i in range(1, 10): # 1 ~ 100 적절히 조절
#생성
t = Process(name=str(i), target=square, args=(i,))
#배열에 담기
processes.append(t)
#시작
t.start()
for process in processes:
process.join()
#종료
print(Main-Processing Done!')
<ProcessPoolExecutor>
키워드 :: ProcessPoolExecutor, as_completed, futures, timeout, dict
from concurrent.futures import ProcessPoolExecutor, as_completed
import urllib.request
URLS = [
'http://www.daum.net/',
'http://www.naver.com/',
'http://www.cnn.com/'
]
#실행함수
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
def main():
#프로세스풀 context 영역
with ProcessPoolExecutor(max_workers=S) as executor:
#Future 로드(실행x)
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
#중간확인
print(future_to_url)
#실행
for future in as_completed(future_to_url): #timeout=1
#Key 값이 Future 객체
url = future_to_url[future]
try:
#결과
data = future.result()
except Exception as exc:
#예외처리
print('%r page generated an exception: %s' % (url, exc))
else:
#결과확인
print('%r page is %d bytes' %(url, len(data)))
#메인 시작
if __name__ == '__main___':
main()
<sharing state>
키워드 :: memory sharing, array, value
from multiprocessing import Process, current_process
import os
#프로세스 메모리 공유 예제 (공유X)
#실행 함수
def generate_update_number(v: int):
for _ in range(50):
v += 1
print(current_process().name, "data", v)
def main():
#부모 프로세스 아이디
parent_process_id = os.getpid()
#출력
print(f'Parent process ID {parent_process_id}')
#프로세스 리스트 선언
processes = list()
#프로세스 메모리 공유 변수
share_value = 0
for _ in range(1, 10):
#생성
p = Process(target=generate_update_number, args=(share_value,))
#배열에 담기
processes.append(p)
#실행
p.start()
#join
for p in processes:
p.join()
#최종 프로세스 부모 변수 확인
print('Final Data in parent process', share_value)
if __name__ == '__main__':
main()
from multiprocessing import Process, current_process, Value, Array
import os
#프로세스 메모리 공유 예제 (공유O)
#실행 함수
def generate_update_number(v: int):
for _ in range(50):
v.value += 1
print(current_process().name, "data", v)
def main():
#부모 프로세스 아이디
parent_process_id = os.getpid()
#출력
print(f'Parent process ID {parent_process_id}')
#프로세스 리스트 선언
processes = list()
#프로세스 메모리 공유 변수
#from multiprocess import shared_memory 사용 가능
#from multiprocess import Manager 사용 가능
share_numbers = Array('i', range(50))
share_value = Value('i', 0)
for _ in range(1, 10):
#생성
p = Process(target=generate_update_number, args=(share_value,))
#배열에 담기
processes.append(p)
#실행
p.start()
#join
for p in processes:
p.join()
#최종 프로세스 부모 변수 확인
print('Final Data in parent process', share_value.value)
if __name__ == '__main__':
main()
<Queue, Pipe>
키워드 :: Queue, Pipe, Communication between processes
# 프로세스 통신 구현 Queue / PIPE 인 경우 **로 표시
from multiprocessing import Process, Queue, current_process
import time
import os
def worker(id, baseNum, q):
process_id = os.getpid()
process_name = current_process().name
#누적
sub_total = 0
#계산
for i in range(baseNum):
sub_total += 1
#Produce
q.put(sub_total)
# **conn.send(sub_total)
# **conn.close()
#정보 출력
print(f'Process ID: {process_id}, Process Name: {process_name} ID: {id}')
print(f'Result : {sub_total}')
def main():
# 부모 프로세스 아이디
parent_process_id = os.getpid()
#출력
print(f'Parent process ID{parent_process_id}')
#프로세스 리스트 선언
processes = list()
#시작 시간
start_time = time.time()
#Queue 선언
q = Queue()
# ** Pipe 선언
parent_conn, child_conn = Pipe()
# **for문만 삭제
for i in range(5):
#생성
t = Process(name=str(i), target=worker, args=(i, 10000000, q))
#배열에 담기
processes.append(t)
#시작
t.start()
#Join
for process in processes:
process.join()
#순수 계산 시간
print("--- %s seconds ---" % (time.time() - start_time))
#종료 플래스
q.put('exit')
total = 0
#대기
while True:
tmp = q.get()
if tmp == 'exit':
break
else:
total += tmp
print()
print('Main-Processing Total Count={}'.format(total))
if __name__ == '__main__':
main()
'▶ InfoSecurity > 병렬프로그래밍' 카테고리의 다른 글
섹션 3. Concurrency, CPU Bound vs I/O Bound (2) | 2023.12.18 |
---|---|
섹션 1. Multithreading 멀티스레딩 (0) | 2023.12.15 |