삼성전자 주가데이터 읽기
read_parquet
주가 데이터를 parquet 포맷으로 저장했습니다. 이제 parquet 데이터를 활용해야 하는데 가공하려면 파일을 읽을 줄 알아야 하겠죠? 먼저 하나의 데이터를 읽어보겠습니다. price.ipynb 파일은 데이터 수집용으로 두고, read.ipynb 파일을 새로 만들어 코드를 작성하겠습니다.
import pandas as pd
pd.read_parquet('data/000050.parquet')
Python
복사
read_parquet 얼마나 빠른가?
time 내장 라이브러리를 사용하면 현재 시간을 얻을 수 있습니다. 시작 시간과 종료 시간의 차이를 계산하면 실행 시간을 측정할 수 있죠. 이를 통해 데이터를 매우 빠르게 읽어오는 것을 확인할 수 있습니다.
종가데이터 전체 읽기
이제 전종목 주가데이터를 비교하고 활용하기 위해 각 파일에서 종가(Close)를 추출하고, 컬럼명을 종목코드로 바꾼 뒤 날짜순으로 모든 종목을 통합하려고 합니다. 일반적으로 여러 파일의 데이터를 가져올 때는 폴더 내부의 파일들을 for문으로 순회하는 방식을 사용할 수 있습니다. 그 외에도 pyarrow라는 라이브러리를 사용하면 특정 경로에 있는 모든 parquet 파일들을 한 번에 효율적으로 읽어올 수 있습니다. 몇가지 예시로 속도를 체험해보겠습니다.
코드에 대한 자세한 설명은 성능 확인 후에 진행하도록 하겠습니다. 우선 코드를 눈으로 살펴보세요!
for문으로 순차적으로 읽기
os 라이브러리를 사용하여 data 폴더의 모든 parquet 파일을 리스트에 담아 for문으로 순회했습니다. 각 파일을 read_parquet으로 읽어 데이터프레임으로 변환한 뒤, close 컬럼만 분리하고 해당 컬럼명을 파일명(종목코드)으로 변경했습니다. 마지막으로 날짜를 기준으로 모든 데이터를 병합했으며, 전체 처리 시간은 13.32초가 소요되었습니다.
pyarrow.dataset
pyarrow의 dataset은 경로에 있는 모든 parquet 파일을 데이터베이스처럼 사용할 수 있습니다. 한 번에 아주 빠르게 불러오는 것이 가능하죠. 다만 이번에는 종가만 불러와서 컬럼에 종목코드를 삽입하고 날짜를 기준으로 합치는 과정이 필요해서 pyarrow의 기능을 완벽하게 활용하기는 어렵습니다. 그럼에도 pyarrow의 table을 이용하면 실행속도를 개선 할 수 있습니다. 우선 실행 시간을 볼까요?
pyarrow의 table 방식은 필요한 컬럼만 선택적으로 호출할 수 있어 메모리 사용량을 최소화하고 데이터 처리 속도를 크게 향상시킬 수 있습니다.
pyarrow의 dataset에 경로와 format을 지정하면 모든 파일을 한 번에 불러올 수 있습니다. 파일들을 파편으로 인식해 병합하거나 묶어서 사용할 수 있는 장점이 있지만, 종목코드를 데이터 구분용으로 추가해야 하는 제약 때문에 이 기능을 충분히 활용하기 어렵습니다. 이러한 한계를 극복하기 위해 함수화를 통한 더 효율적인 처리 방법을 살펴보겠습니다.
병렬처리
dataset과 pyarrow를 활용하여 속도를 향상시켰지만, 병렬 처리를 적용하면 처리 속도를 더욱 획기적으로 개선할 수 있습니다.
파이썬의 병렬처리란 여러 작업을 동시에 실행하는 방식입니다. 일반적으로 코드는 순차적으로 한 번에 하나의 작업만 처리하지만, 병렬처리를 사용하면 CPU의 여러 코어를 활용하여 동시에 여러 작업을 수행할 수 있습니다.
예를 들어, 여러 파일을 읽어야 할 때 순차적으로 한 파일씩 처리하는 대신, 여러 파일을 동시에 읽어 처리 시간을 크게 단축할 수 있습니다. 이는 특히 대용량 데이터 처리나 복잡한 연산이 필요한 경우에 매우 효과적입니다.
코드설명
이 코드에는 이전 학습노트에서 다루지 않은 새로운 내용이 포함되어 있습니다. 특히 pyarrow와 병렬처리를 위한 ThreadPoolExecutor는 이번에 처음 소개되는 개념입니다. 이러한 내용을 완벽하게 이해하고 습득하기 위해서는 직접 코드를 실행해보는 것이 가장 효과적인 방법입니다. 따라서 위에서 학습한 기본 읽기부터 순회, dataset, 그리고 최종 코드까지 모든 내용을 담은 ipynb 파일을 첨부했으니, 코드를 단계별로 나누어 하나씩 실행해보시기 바랍니다.
이해를 돕기 위해 단계별로 나누어 하나씩 실행해보세요
1.
변수 출력
2.
for문의 코드를 한 줄씩 주석 처리하면서 각 단계의 출력 결과 확인
이렇게 하면 원본 데이터가 어떻게 변화되어 가는지 쉽게 파악할 수 있습니다.
import pyarrow.dataset as ds # pyarrow의 dataset 모듈 임포트
import pandas as pd # 데이터 처리를 위한 pandas 라이브러리
import time # 실행 시간 측정을 위한 time 모듈
from concurrent.futures import ThreadPoolExecutor # 병렬 처리를 위한 ThreadPoolExecutor
# (변수)parquet 파일들이 저장된 폴더 경로 지정
dataFolder = "data"
# 지정된 경로의 모든 parquet 파일을 하나의 데이터셋으로 로드
dataset = ds.dataset(dataFolder, format="parquet")
def processFragment(fragment):
"""각각의 parquet 파일(fragment)을 처리하는 함수
fragment: 개별 parquet 파일을 나타내는 객체(파일에서 불러온 데이터)
df: 가공된 pandas DataFrame
"""
# 파일 경로에서 종목코드 추출 (예: "data/000050.parquet" -> "000050")
code = fragment.path.split('/')[-1].split('.')[0]
# 필요한 컬럼(Date, Close)만 선택하여 테이블로 변환
# 메모리 사용량 최적화를 위해 필요한 컬럼만 로드
table = fragment.to_table(columns=['Date', 'Close'])
# Close 컬럼명을 해당 종목코드로 변경
# 예: 'Close' -> '000050'
table = table.rename_columns(['Date', code])
# pyarrow 테이블을 pandas DataFrame으로 변환하고 Date를 인덱스로 설정
df = table.to_pandas().set_index('Date')
return df
# 병렬 처리 시작
startTime = time.time() # 시작 시간 기록
# ThreadPoolExecutor를 사용하여 병렬 처리 실행
with ThreadPoolExecutor() as executor:
# executor.map()을 사용하여 각 파일을 동시에 처리
# dataset.get_fragments()로 모든 parquet 파일 목록을 가져옴
# processFragment 함수를 각 파일에 병렬로 적용
dfs = list(executor.map(processFragment, dataset.get_fragments()))
# 모든 데이터프레임을 날짜 기준으로 열방향(axis=1)으로 병합
# 결과적으로 날짜 × 종목코드 형태의 데이터프레임이 생성됨
result = pd.concat(dfs, axis=1)
# 처리 완료 시간 기록 및 소요 시간 출력
endTime = time.time()
print(f"데이터 처리 실행 시간: {endTime - startTime:.2f}초")
result # 최종 결과 데이터프레임 반환
Python
복사
read.ipynb 및 코드 분해 예시
노트에 나눠서 실행해보세요!!