# pai-pipeline

Part of **PAI**

<!-- intent-backlink:auto -->

> 💡 **Path Selection**: This skill is one implementation path for [Train a machine learning model](../../intent/pai-train-model/SKILL.md). If you're unsure which path to take, check the routing skill first.

# Platform for AI (PAI) Pipeline & Workflow Management

## Capabilities Overview

| Sub-capability | Calling Mode | Description |
|----------------|--------------|-------------|
| Manage Pipelines | Synchronous | Create, delete, get, list, and update ML pipelines. |
| Manage Pipeline Runs | Synchronous | Create, delete, get, list, start, terminate, rerun, and update pipeline runs. |
| Monitor Pipeline Nodes | Synchronous | Get node information, logs, outputs, and status for pipeline runs. |
| Configure Pipeline Permissions | Synchronous | Manage RAM authorization for pipeline operations. |
| Configure Pipeline Manifests | Synchronous | Set parameters for PAI-Flow pipeline manifests. |
| List Operations by Function | Synchronous | Access a functional listing of pipeline job operations. |

## API Calling Modes

### Authentication
The primary authentication method is Bearer Token authentication.

- Use the header: `Authorization: Bearer <your_api_key>`
- Store your API key in the environment variable: `DASHSCOPE_API_KEY`
- While other authentication methods may exist, Bearer Token is the recommended and most commonly used approach for PAI Pipeline APIs.

### Service Endpoint
The APIs use region-specific endpoints following this pattern:

`https://api.aliyun.com/api/PAIFlow/2021-02-02/{Operation}` (for China regions)  
`https://api.alibabacloud.com/api/PAIFlow/2021-02-02/{Operation}` (for international regions)

Common regions include:
- `cn-hangzhou`
- `cn-shanghai`
- `cn-beijing`

### Synchronous API Pattern
All PAI Pipeline & Workflow Management APIs follow a synchronous calling pattern:

1. **Send Request**: Make an HTTP request (GET, POST, PUT, or DELETE) to the appropriate endpoint with required parameters and the `Authorization: Bearer` header.
2. **Receive Immediate Response**: The server processes the request and returns a JSON response immediately.
3. **Handle Response**: Parse the JSON response to extract results (e.g., `PipelineId`, `PipelineRunId`) or handle errors.
4. **No Polling Required**: Unlike asynchronous APIs, there is no need to poll for results—the response contains the final outcome.

## Parameter Reference

### Manage Pipelines

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| WorkspaceId | string | true | - | - | The workspace ID. You can call ListWorkspaces to obtain the workspace ID. |
| Manifest | string | true | - | - | The pipeline definition. For more information, see the sample pipeline definition. |
| PipelineId | string | true | - | - | The pipeline ID. You can call ListPipelines to obtain the ID of the pipeline. |
| PageNumber | integer | false | 1 | - | The page number. Pages start from page 1. |
| PageSize | integer | false | - | - | The number of entries per page. |
| PipelineIdentifier | string | false | - | - | The pipeline identifier. |
| PipelineProvider | string | false | - | - | The pipeline provider. The value is pai if the pipeline is provided by PAI. The value is the ID of the Alibaba Cloud account if the pipeline is created by using the account. |
| PipelineVersion | string | false | - | - | The pipeline version. |
| FuzzyMatching | boolean | false | false | one of: false, true | Specifies whether to support fuzzy match. |

### Manage Pipeline Runs

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| PipelineRunId | string | true | - | - | The ID of the pipeline job. |
| WorkspaceId | string | true | - | - | The workspace ID. |
| PipelineId | string | false | - | - | The pipeline ID. You must configure PipelineId or PipelineManifest. |
| PipelineManifest | string | false | - | - | The pipeline definition. You must configure PipelineId or PipelineManifest. |
| Name | string | false | - | - | The name of the pipeline job. If you leave this parameter empty, the system automatically generates a name. |
| Arguments | string | false | - | - | The parameters. |
| NoConfirmRequired | boolean | false | true | true or false | Specifies whether to start the pipeline job. |
| SourceType | string | false | UNKNOWN | one of: UNKNOWN, SDK, DESIGNER, M6 | The type of the pipeline job source. |
| SourceId | string | false | - | - | The source ID. |
| Options | string | false | - | JSON format | The options used to create the pipeline job, which are in the JSON format. |
| Accessibility | string | false | PUBLIC | one of: PUBLIC, PRIVATE | The pipeline accessibility. |
| Verbose | boolean | false | false | - | Specifies whether to return detailed information. |
| ManifestType | string | false | Raw | one of: Raw, Frozen, Rendered, Expanded, Runtime | The type of the manifest to be returned. |
| Status | string | false | - | one of: Initialized, Running, Succeeded, Failed, Terminated, Unknown, Skipped, Terminating | The status of the pipeline jobs to be filtered. |
| SortBy | string | false | - | one of: PipelineId, UserId, ParentUserId, StartedAt, FinishedAt, WorkflowServiceId, Duration, GmtCreateTime, GmtModifiedTime | The field for sorting. |
| Order | string | false | DESC | one of: ASC, DESC | The sorting order. |

### Monitor Pipeline Nodes

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| PipelineRunId | string | true | - | - | The ID of the pipeline job. |
| NodeId | string | true | - | - | The ID of a node in the pipeline job. |
| Depth | integer | false | 2 | - | The depth at which information about the node is queried. |
| Type | string | false | Logical | one of: Logical, Physical | The node type. |
| Offset | integer | false | 0 | - | The offset from which the data is returned. |
| FromTimeInSeconds | integer | false | - | - | The beginning of the time range to query. Unit: seconds. |
| ToTimeInSeconds | integer | false | - | - | The end of the time range to query. Unit: seconds. |
| Keyword | string | false | - | - | The search keyword. Fuzzy match is supported. |
| Reverse | boolean | false | false | - | Specifies whether the logs are sorted in descending order. |
| OutputType | string | false | - | one of: Model, DataSet, Metrics, ModelEvaluation | The output type of the pipeline node. |

### Configure Pipeline Manifests

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| apiVersion | String | true | core/v1 | Must be 'core/v1' | The version of the manifest schema. |
| identifier | String | true | N/A | - | The unique identifier of the pipeline. |
| version | String | true | N/A | - | The version of the pipeline. |
| provider | String | true | N/A | One of: 'Your user ID', 'PAI' | The provider of the pipeline. |
| name | String | false | N/A | - | The name of the pipeline. |
| displayName | String | false | N/A | - | The display name of the pipeline. |
| guid | String | false | N/A | - | The unique identifier of a node of the pipeline. Configure only for temporary nodes. |
| inputs | Object | false | N/A | - | The inputs of the pipeline. |
| outputs | Object | false | N/A | - | The outputs of the pipeline. |
| pipelines | List<Object> | false | N/A | - | The nodes in the pipeline. |

## Code Examples

### Create a Pipeline - curl - all

```bash
curl -X POST https://api.aliyun.com/api/PAIFlow/2021-02-02/CreatePipeline \
-H "Authorization: Bearer <your-api-key>" \
-H "Content-Type: application/json" \
-d '{
  "WorkspaceId": "72***",
  "Manifest": "apiVersion: \"core/v1\"\nmetadata:\n  provider: \"166233998075****\"\n  version: \"v1\"\n  identifier: \"my_pipeline\"\n  name: \"source-transform\"\nspec:\n  inputs:\n    parameters:\n    - name: \"execution_maxcompute\"\n      value:\n        spec:\n          endpoint: \"http://service.cn.maxcompute.aliyun-inc.com/api\"\n          odpsProject: \"test_i****\"\n      type: \"Map\"\n  pipelines:\n  - apiVersion: \"core/v1\"\n    metadata:\n      provider: \"pai\"\n      version: \"v1\"\n      identifier: \"data_source\"\n      name: \"data-source\"\n      displayName: \"Read Table-1\"\n    spec:\n      arguments:\n        parameters:\n        - name: \"inputTableName\"\n          value: \"pai_online_project.wumai_data\"\n        - name: \"execution\"\n          from: \"{{inputs.parameters.execution_maxcompute}}\"\n  - apiVersion: \"core/v1\"\n    metadata:\n      provider: \"pai\"\n      version: \"v1\"\n      identifier: \"type_transform\"\n      name: \"type-transform\"\n      displayName: \"Data Type Conversion-1\"\n    spec:\n      arguments:\n        artifacts:\n        - name: \"inputTable\"\n          from: \"{{pipelines.data_source.outputs.artifacts.outputTable}}\"\n        parameters:\n        - name: \"cols_to_double\"\n          value: \"time,hour,pm2,pm10,so2,co,no2\"\n        - name: \"execution\"\n          from: \"{{inputs.parameters.execution_maxcompute}}\"\n      dependencies:\n      - \"data_source\""
}'
```

### Create a Pipeline Run - curl - all

```bash
curl -X POST https://api.aliyun.com/api/PAIFlow/2021-02-02/CreatePipelineRun \
-H "Authorization: Bearer $DASHSCOPE_API_KEY" \
-H "Content-Type: application/json" \
-d '{
  "WorkspaceId": "84***",
  "PipelineId": "flow-rer7y***",
  "Name": "testName",
  "NoConfirmRequired": true,
  "SourceType": "UNKNOWN",
  "SourceId": "experiment-ybpy***",
  "Options": "{\"mlflow\":{\"experimentId\":\"exp-1jdk***\"}}",
  "Accessibility": "PUBLIC"
}'
```

### Get Pipeline Run Details - python - all

```python
import requests

url = "https://api.aliyun.com/api/PAIFlow/2021-02-02/GetPipelineRun"
params = {
    "PipelineRunId": "flow-j94y**********lf7a"
}
headers = {
    "Authorization": "Bearer $DASHSCOPE_API_KEY",
    "Content-Type": "application/json"
}

response = requests.get(url, params=params, headers=headers)
print(response.json())
```

### Delete a Pipeline Run - python - all

```python
import requests

url = "https://api.aliyun.com/api/PAIFlow/2021-02-02/DeletePipelineRun"
params = {
    "PipelineRunId": "flow-hynm2bv8wqhp5esfxq"
}
headers = {
    "Authorization": "Bearer <your-api-key>"
}

response = requests.delete(url, params=params, headers=headers)
print(response.json())
```

### Start a Pipeline Run - bash - all

```bash
curl -X PUT 'https://api.aliyun.com/api/PAIFlow/2021-02-02/pipelineruns/flow-rbvg5wzljzjhc9ks92/start' \
-H 'Authorization: Bearer $DASHSCOPE_API_KEY' \
-H 'Content-Type: application/json'
```

### Single-Node Pipeline Manifest - yaml - all

```yaml
apiVersion: core/v1
metadata:
  provider: '13266******76250'
  version: v1
  identifier: echo
spec:
  inputs:
    parameters:
    - name: outputTableName
      type: String
      value: pai_temp_outputTable
  outputs:
    artifacts:
    - name: outputTable
      metadata:
        type:
          DataSet:
            locationType: MaxComputeTable
      desc: SQL Script Output Port
  container:
    image: 'registry.cn-shanghai.aliyuncs.com/paiflow-core/max-compute-executor:v1.1.4'
    command:
    - sh
    - -c
    args:
    - |
      mkdir -p /pai/outputs/artifacts/outputTable/
      echo '{"metadata":{"type":{"DataSet":{"locationType":"MaxComputeTable"}}}' > /pai/outputs/artifacts/outputTable/metadata
      echo '{"location": {"endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
      "project": "wyl_t******t2", "table":  "{{inputs.parameters.outputTableName}}"}}' > /pai/outputs/artifacts/outputTable/value
```

### DAG Pipeline Manifest - yaml - all

```yaml
apiVersion: "core/v1"
metadata:
  provider: "11577*******4901"
  version: "v1"
  identifier: "job-root-pipeline-identifier"
spec:
  inputs:
    parameters:
    - name: "execution_maxcompute"
      type: "Map"
  pipelines:
  - apiVersion: "core/v1"
    metadata:
      provider: "pai"
      version: "v1"
      identifier: "data_source"
      name: "data_source"
      displayName: "Read data source"
    spec:
      arguments:
        parameters:
        - name: "inputTableName"
          value: "pai_online_project.wumai_data"
        - name: "execution"
          from: "{{inputs.parameters.execution_maxcompute}}"
  - apiVersion: "core/v1"
    metadata:
      provider: "pai"
      version: "v1"
      identifier: "type_transform"
      name: "type_transform"
      displayName: "Type conversion"
    spec:
      arguments:
        artifacts:
        - name: "inputTable"
          from: "{{pipelines.data_source.outputs.artifacts.outputTable}}"
        parameters:
        - name: "cols_to_double"
          value: "time,hour,pm2,pm10,so2,co,no2"
        - name: "default_int_value"
          value: "0"
        - name: "reserveOldFeat"
          value: "false"
        - name: "execution"
          from: "{{inputs.parameters.execution_maxcompute}}"
      dependencies:
      - "data_source"
  - apiVersion: "core/v1"
    metadata:
      provider: "pai"
      version: "v1"
      identifier: "sql"
      name: "sql"
      displayName: "SQL script"
    spec:
      arguments:
        artifacts:
        - name: "inputTable1"
          from: "{{pipelines.type_transform.outputs.artifacts.outputTable}}"
        parameters:
        - name: "sql"
          value: "select time,hour,(case when pm2>200 then 1 else 0 end),pm10,so2,co,no2\n\tfrom ${t1}"
        - name: "execution"
          from: "{{inputs.parameters.execution_maxcompute}}"
      dependencies:
      - "type_transform"
```

## Response Format

```json
{
  "RequestId": "DA869D1B-035A-43B2-ACC1-C56681******",
  "PipelineId": "pipeline-hynm2bv8**********"
}
```

**Key Fields**:
- `RequestId` — Unique identifier for the API request, useful for troubleshooting
- `PipelineId` — Unique identifier of the created or retrieved pipeline
- `PipelineRunId` — Unique identifier of a pipeline run/job
- `Status` — Current execution status of a pipeline run (e.g., Running, Succeeded, Failed)
- `Manifest` — Full pipeline definition in YAML/JSON format
- `TotalCount` — Total number of items matching a list query (for pagination)

## Error Handling

| Error Code (Code) | Description (Description) | Recommended Action (Recommended Action) |
|-------------------|---------------------------|----------------------------------------|
| 400 | Bad Request: The request parameters are invalid or missing required fields. | Validate all required parameters and ensure they meet constraints (e.g., correct format, valid values). |
| 401 | Unauthorized: The API key or authentication credentials are invalid or missing. | Verify that the `DASHSCOPE_API_KEY` environment variable is set and the `Authorization: Bearer` header is correctly formatted. |
| 403 | Forbidden: The user does not have sufficient permissions to create a pipeline in the specified workspace. | Check RAM permissions; ensure the account has `paiflow:CreatePipeline` or equivalent permissions for the target workspace. |
| 404 | Not Found: The specified workspace ID does not exist. | Confirm the `WorkspaceId` is correct by calling `ListWorkspaces` first. |
| 429 | Too Many Requests: The request rate exceeds the allowed limit. Wait and retry after a short delay. | Implement exponential backoff; reduce request frequency to stay under 100 QPS. |
| 500 | Internal Server Error: An unexpected error occurred on the server side. Retry the request or contact support. | Retry with exponential backoff; if persistent, contact Alibaba Cloud support with the `RequestId`. |
| InvalidParameter.PipelineInUse | The specified pipeline is subscribed by users as a shared pipeline or referenced by other pipelines as a sub-pipeline, and cannot be deleted. | Remove all references/subscriptions to the pipeline before attempting deletion. |

### Rate Limits & Retry
- **QPS Limit**: 100 queries per second per account/user
- **Retry Strategy**: Use exponential backoff (e.g., wait 1s, 2s, 4s, 8s) on 429 errors
- **Header Handling**: Respect the `Retry-After` header if present in error responses

## Environment Requirements

- Set your API key as an environment variable: `export DASHSCOPE_API_KEY=your_key_here`
- For Python examples, install the `requests` library: `pip install requests`

## FAQ

Q: How do I obtain a WorkspaceId?
A: Call the `ListWorkspaces` API (not covered in this skill) or find it in the PAI console under Workspace settings. Most pipeline operations require a valid WorkspaceId.

Q: What's the difference between PipelineId and PipelineRunId?
A: `PipelineId` identifies a pipeline template/definition, while `PipelineRunId` identifies a specific execution instance of that pipeline. You create a pipeline once, but can run it many times (each run gets a unique PipelineRunId).

Q: Can I update a pipeline that's currently running?
A: No. You can only update the pipeline definition (via `UpdatePipeline`) when no runs are active. However, you can update the name of a pipeline run (via `UpdatePipelineRun`) even while it's executing.

Q: How do I debug a failed pipeline run?
A: First, call `GetPipelineRun` to check the overall status and error message. Then, use `ListPipelineRunNodeStatus` and `ListPipelineRunNodeLogs` to inspect individual node statuses and logs for detailed error information.

Q: Are pipeline manifests validated before creation?
A: Yes. The `CreatePipeline` and `CreatePipelineRun` APIs validate the manifest structure. If validation fails, you'll receive a 400 error with details about the invalid fields or missing requirements.

## Pricing & Billing

### Billing Model
Billing is based on a per-request model. Each API call (regardless of success or failure) counts as one request.

### Price Reference

| Tier | Input Price | Output Price | Other Fees |
|------|-------------|--------------|------------|
| CreatePipeline | 0.001 / | - | - |
| DeletePipeline | 0.001 / | - | - |
| GetPipeline | 0.0001 / | 0.0001 / | - |
| ListPipelines | 0.001 / | 0.001 / | - |
| UpdatePipeline | 0.001 / | 0.001 / | - |
| CreatePipelineRun | 0.001 / | 0.001 / | - |
| DeletePipelineRun | 0.001 / | - | - |
| GetPipelineRun | 0.0001 / | 0.0001 / | - |
| ListPipelineRuns | 0.001 / | 0.001 / | - |
| RerunPipelineRun | 0.001 / | - | - |
| StartPipelineRun | 0.001 / | 0.001 / | - |
| TerminatePipelineRun | 0.001 / | - | - |
| UpdatePipelineRun | 0.001 / | 0.001 / | - |
| GetPipelineRunNode | 0.001 / | 0.001 / | - |
| ListPipelineRunNodeLogs | 0.001 / | 0.001 / | - |
| ListPipelineRunNodeOutputs | 0.0001 / | 0.0001 / | - |
| ListPipelineRunNodeStatus | 0.0001 / | 0.0001 / | - |
| PAI-Flow pipeline run | 0.002 / | 0.002 / | Additional costs for compute resources (DLC jobs, MaxCompute) |

### Free Tier
- CreatePipeline: 100 free calls per month
- DeletePipeline: 1000 free calls per month
- GetPipeline: 1000 free calls per month
- ListPipelines: 1000 free calls per month
- UpdatePipeline: 100 free calls per month
- Most pipeline run operations: 1000 free calls per month
- Node monitoring operations: 1000 free calls per month

### Usage Limits
- Rate limit: 100 QPS per account/user across all operations
- Request body size: Up to 100KB for pipeline run creation
- Pagination: Default page size is 10 for list operations

### Billing Notes
- All API calls are billed per request, including failed requests (except where explicitly noted)
- Pipeline execution (runs) incurs separate charges beyond API calls, especially when using compute resources like DLC jobs or MaxCompute
- Free tier quotas reset monthly
- Compute resource usage (e.g., GPU instances, storage) is billed separately based on duration and instance type