AWS Sagemaker 파이프라인은 머신러닝에 필요할 것으로 예상되는 표준화된 단계들이 일관된 형태로 상호작용할 수 있는지를 고민했다. TFX(from2)와 매우 유사하다. AWS Sagemaker 파이프라인은 이 모든 작업이 클라우드에서 작동하는 경우만을 처리할 수 있을 뿐이다.
import boto3
import sagemaker
region = **boto3**.Session().region_name
role = **sagemaker**.get_execution_role()
sagemaker_session = **sagemaker**.session.Session()
# 파이프라인의 입출력을 워크플로우 파라미터 객체로 정의한다.
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
# 아래 정의된 변수들은 파이프라인 구성 요소들을 정의할 때 사용된다.
**preprocessing_instance_count** = ParameterInteger(
name="PreprocessingInstanceCount",
default_value=1)
**preprocessing_instance_type** = ParameterString(
name="PreprocessingInstanceType",
default_value="ml.m5.xlarge")
**training_instance_type** = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge")
**training_instance_count** = ParameterInteger(
name="TrainingInstanceCount",
default_value=1)
**input_data** = ParameterString(
name="InputData",
default_value=input_data_uri) # S3 경로
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
# 파이프라인에서 실질적으로 실행될 대상 정의
# 이 경우에는 sklearn 모델 입력 전처리를 위한 'Processor'
sklearn_preprocessor = SKLearnProcessor(
framework_version="0.23-1",
instance_type=**preprocessing_instance_type**,
instance_count=**preprocessing_instance_count**,
base_job_name="sklearn-fraud-preprocess",
role=role,
)
# 파이프라인의 특정 노드, 즉 Step 정의
# 이 경우에는 전처리를 의미하는 'Processing'
step_process = ProcessingStep(
name="FraudScratchPreprocess",
processor=sklearn_preprocessor,
inputs=[
ProcessingInput(source=**input_data**, destination='/opt/ml/processing/input'),
],
outputs=[
ProcessingOutput(output_name="train", source='/opt/ml/processing/output/train'),
ProcessingOutput(output_name="test", source='/opt/ml/processing/output/test')
],
job_arguments=["--split_rate", "0.2"],
code="src/preprocessing.py",
)
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name="sagemaker-pipeline",
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
training_instance_count,
input_data,
],
steps=[
step_process,
step_train,
step_create_model,
step_deploy
],
)
# 파이프라인 정의를 Sagemaker 파이프라인에 제출하여 파이프라인을 생성
# 파이프라인이 이미 존재하면 파이프라인 정의를 업데이트
# Sagemaker Studio 에서도 파이프라인을 확인할 수 있음
pipeline.upsert(role_arn=role)
# 파이프라인 샐행
execution = pipeline.start()
# 파이프라인 진행상황 확인
execution.describe()
# 실행 완료 대기
execution.wait()
# 실행 결과를 담은 리스트 반환
# steps_li[-1] 와 같은 방법으로 값을 가져올 수 있어 경로 추출에 유용하게 사용 가능
steps_li = execution.list_steps()
파이프라인 API 이용 비용이 따로 들지는 않지만, Sagemaker Studio를 사용하거나 파이프라인에서 사용하는 인스턴스 혹은 서버리스 비용(참고1,참고2:서버리스를 사용할 수 있음)이 청구된다.
parse me : 언젠가 이 글에 쓰이면 좋을 것 같은 재료들.
None
from : 과거의 어떤 생각이 이 생각을 만들었는가?
supplementary : 어떤 새로운 생각이 이 문서에 작성된 생각을 뒷받침하는가?
None
opposite : 어떤 새로운 생각이 이 문서에 작성된 생각과 대조되는가?
None