Overview
SageMaker Pipelines lets you define a directed acyclic graph (DAG) of ML steps that SageMaker executes, tracks, and makes reproducible. This project builds a complete pipeline over a retail sales dataset: raw data in S3 goes in, predictions come out, with every intermediate artefact versioned and auditable.
The four steps in the pipeline:
PreprocessData → TrainModel → CreateInferenceModel → BatchInference
The Dataset
Walmart retail sales data with three source tables (features, sales, stores). The target is weekly sales per store — a regression problem.
Step 1: Preprocessing with ProcessingStep
A ScriptProcessor runs a custom Python script inside a scikit-learn container. It merges the three tables, engineers time features, one-hot encodes categoricals, splits into train/validation/test, and writes artefacts to S3.
processor = ScriptProcessor(
image_uri="246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3",
command=["python3"],
role=role,
instance_count=1,
instance_type="ml.t3.medium",
sagemaker_session=pipeline_session,
)
processing_step = ProcessingStep(
name="PreprocessData",
processor=processor,
code="s3://my-bucket/pipelines/scripts/processing_script.py",
inputs=[
ProcessingInput(source="s3://my-bucket/raw/features.csv",
destination="/opt/ml/processing/features"),
ProcessingInput(source="s3://my-bucket/raw/sales.csv",
destination="/opt/ml/processing/sales"),
],
outputs=[
ProcessingOutput(output_name="train",
source="/opt/ml/processing/output/train",
destination="s3://my-bucket/pipelines/data/train"),
ProcessingOutput(output_name="validation",
source="/opt/ml/processing/output/validation",
destination="s3://my-bucket/pipelines/data/validation"),
ProcessingOutput(output_name="test",
source="/opt/ml/processing/output/test",
destination="s3://my-bucket/pipelines/data/test"),
],
)
Step 2: Training with TrainingStep
Uses SageMaker’s built-in XGBoost container. Spot instances keep cost down; max_wait is set higher than max_run to account for spot interruption retries.
estimator = Estimator(
image_uri="246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:1.5-1",
role=role,
instance_count=1,
instance_type="ml.m5.large",
use_spot_instances=True,
max_run=150,
max_wait=480,
hyperparameters={
"objective": "reg:squarederror",
"max_depth": "10",
"eta": "0.1",
"num_round": "200",
},
sagemaker_session=pipeline_session,
)
training_step = TrainingStep(
name="TrainModel",
estimator=estimator,
inputs={
"train": TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig
.Outputs["train"].S3Output.S3Uri,
content_type="text/csv", # must be explicit — XGBoost defaults to libsvm
),
"validation": TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig
.Outputs["validation"].S3Output.S3Uri,
content_type="text/csv",
),
},
)
Key lesson: always set content_type="text/csv" explicitly. XGBoost’s built-in container assumes libsvm format by default — omitting this causes a cryptic validation error at runtime.
Step 3: Model Creation with ModelStep
Converts training artefacts (.tar.gz in S3) into a deployable SageMaker Model resource. This is the bridge between training and inference.
model = Model(
image_uri="246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:1.5-1",
model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
role=role,
sagemaker_session=pipeline_session,
)
create_model_step = ModelStep(
name="CreateInferenceModel",
step_args=model.create(instance_type="ml.m5.large"),
)
Step 4: Batch Inference with TransformStep
The test set (48 MB) was too large for a single transform request (default max_payload=6 MB). Solution: add a parallel SplitTestCSV processing step that chunks the test file, then run TransformStep over the directory of chunks.
transformer = Transformer(
model_name=create_model_step.properties.ModelName,
instance_type="ml.m5.large",
instance_count=1,
output_path="s3://my-bucket/pipelines/data/predictions",
strategy="SingleRecord",
assemble_with="Line",
max_payload=50,
sagemaker_session=pipeline_session,
)
transform_step = TransformStep(
name="BatchInference",
transformer=transformer,
inputs=TransformInput(
data="s3://my-bucket/pipelines/data/test/split/",
content_type="text/csv",
),
)
The SplitTestCSV step runs in parallel with TrainModel — SageMaker Pipelines infers the dependency graph from input/output wiring, so BatchInference waits for both branches automatically.
Assembling and Running the Pipeline
pipeline = Pipeline(
name="SalesPredictionPipeline",
steps=[processing_step, training_step, create_model_step, transform_step],
sagemaker_session=pipeline_session,
)
pipeline.upsert(role_arn=role)
execution = pipeline.start()
The pipeline is visible in SageMaker Studio under Pipelines, with per-step logs, timing, and artefact lineage.
Results
- Full pipeline runs end-to-end without manual intervention
- Spot instances on training cut compute cost by ~70% vs on-demand
- Pipeline is idempotent:
upsertupdates the definition without creating duplicates - All artefacts (processed data, model, predictions) versioned in S3 with pipeline execution ID
Tech Stack
- Orchestration — AWS SageMaker Pipelines
- Modelling — SageMaker built-in XGBoost
- Processing — SageMaker Processing Jobs (scikit-learn container)
- Inference — SageMaker Batch Transform
- Storage — AWS S3
- Language — Python (sagemaker SDK)