AWS Sagemaker 파이프라인은 머신러닝에 필요할 것으로 예상되는 표준화된 단계들이 일관된 형태로 상호작용할 수 있는지를 고민했다. TFX(from2)와 매우 유사하다. AWS Sagemaker 파이프라인은 이 모든 작업이 클라우드에서 작동하는 경우만을 처리할 수 있을 뿐이다.

import boto3
import sagemaker

region = **boto3**.Session().region_name
role = **sagemaker**.get_execution_role()
sagemaker_session = **sagemaker**.session.Session()
# 파이프라인의 입출력을 워크플로우 파라미터 객체로 정의한다.
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

# 아래 정의된 변수들은 파이프라인 구성 요소들을 정의할 때 사용된다.
**preprocessing_instance_count** = ParameterInteger(
    name="PreprocessingInstanceCount",
    default_value=1)
**preprocessing_instance_type** = ParameterString(
    name="PreprocessingInstanceType",
    default_value="ml.m5.xlarge")
**training_instance_type** = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge")
**training_instance_count** = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1)
**input_data** = ParameterString(
    name="InputData",
    default_value=input_data_uri) # S3 경로

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# 파이프라인에서 실질적으로 실행될 대상 정의
# 이 경우에는 sklearn 모델 입력 전처리를 위한 'Processor'
sklearn_preprocessor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=**preprocessing_instance_type**,
    instance_count=**preprocessing_instance_count**,
    base_job_name="sklearn-fraud-preprocess",
    role=role,
)

# 파이프라인의 특정 노드, 즉 Step 정의
# 이 경우에는 전처리를 의미하는 'Processing'
step_process = ProcessingStep(
    name="FraudScratchPreprocess",
    processor=sklearn_preprocessor,
    inputs=[
				ProcessingInput(source=**input_data**, destination='/opt/ml/processing/input'),        
		],
    outputs=[
				ProcessingOutput(output_name="train", source='/opt/ml/processing/output/train'),
				ProcessingOutput(output_name="test", source='/opt/ml/processing/output/test')
		],
    job_arguments=["--split_rate", "0.2"],
    code="src/preprocessing.py",
)
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="sagemaker-pipeline",
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,        
        training_instance_count,                
        input_data,
    ],
    steps=[
				step_process, 
				step_train, 
				step_create_model, 
				step_deploy
		],
)
# 파이프라인 정의를 Sagemaker 파이프라인에 제출하여 파이프라인을 생성
# 파이프라인이 이미 존재하면 파이프라인 정의를 업데이트
# Sagemaker Studio 에서도 파이프라인을 확인할 수 있음
pipeline.upsert(role_arn=role)

# 파이프라인 샐행
execution = pipeline.start()

# 파이프라인 진행상황 확인
execution.describe()

# 실행 완료 대기
execution.wait()

# 실행 결과를 담은 리스트 반환
# steps_li[-1] 와 같은 방법으로 값을 가져올 수 있어 경로 추출에 유용하게 사용 가능
steps_li = execution.list_steps()

파이프라인 API 이용 비용이 따로 들지는 않지만, Sagemaker Studio를 사용하거나 파이프라인에서 사용하는 인스턴스 혹은 서버리스 비용(참고1,참고2:서버리스를 사용할 수 있음)이 청구된다.


parse me : 언젠가 이 글에 쓰이면 좋을 것 같은 재료들.

  1. None

from : 과거의 어떤 생각이 이 생각을 만들었는가?

  1. ba2.3_1.1. title: TFX는 표준화된 다양한 단계들이 어떻게 일관된 형태로 상호작용할 수 있는지를 고민하고 메타데이터스토어, 아티팩트, 컴포넌트라는 이름으로 표준화했다.
  2. a1.2.a9.1.1.2. title: AWS 람다(Lambda)를 이용해 간단한 함수를 배포한다. 꼭 이 함수가 HTTP API를 위해 존재할 필요는 없다. 이 함수는 서비스가 될수도 있고, 클라우드의 자원을 제어하는 스크립트일수도 있고, 다른 AWS 서비스의 기능으로 동작할수도 있고, 한낱 매크로일 수도 있다.

supplementary : 어떤 새로운 생각이 이 문서에 작성된 생각을 뒷받침하는가?

  1. None

opposite : 어떤 새로운 생각이 이 문서에 작성된 생각과 대조되는가?

  1. None