# opensearch-data_ingestion_and_processing

Part of **OPENSEARCH**

# OpenSearch Data Ingestion and Processing

## Capabilities Overview

| Sub-capability | Calling Mode | Description |
|----------------|--------------|-------------|
| Split Document | Synchronous | Splits long documents into smaller chunks for embedding and retrieval in RAG pipelines. Supports HTML, Markdown, and plain text with rich content extraction. |
| Parse Document | Async Task | Extracts structured content from raw documents (PDF, DOC, PPT, etc.) via asynchronous tasks, returning text, tables, images, and logical hierarchy in Markdown or HTML. |
| Document Split | Synchronous | Divides documents into smaller chunks for processing or vectorization, supporting sentence-level chunking and token-based sizing. |
| Create Document Analyze Task | Async Task | Submits a document for asynchronous content parsing and structural analysis, returning a task ID for polling results. |
| Process Data | Synchronous | Executes data processing operations using built-in table-valued functions (TVFs) and SQL statements for ranking, sorting, and unpacking multi-value fields. |
| Configure Data Source | Synchronous | Sets up and configures external data sources (RDS, MaxCompute, PolarDB) for ingestion into OpenSearch applications. |
| Push Documents Bulk | Synchronous | Performs bulk document pushes (ADD/UPDATE/DELETE) as part of data ingestion workflows. |
| List ODPS Projects | Synchronous | Retrieves available MaxCompute (ODPS) projects configured as data sources for integration. |

## API Calling Patterns

### Authentication
**Primary Method**: Bearer Token  
- **Header Format**: `Authorization: Bearer <your_api_key>`  
- **Environment Variable**: `DASHSCOPE_API_KEY` or `ALIBABA_CLOUD_ACCESS_KEY_ID` (depending on service)  
- **Note**: Some ingestion APIs (e.g., bulk push) use AccessKey ID/Secret via environment variables instead. Always check the specific endpoint's auth requirements.

### Service Endpoint
Base URL pattern:  
`https://{region}.opensearch.aliyuncs.com`  

Common regions:  
- `cn-hangzhou`  
- `cn-shanghai`  
- `cn-beijing`  

For document processing services, endpoints often follow:  
`{host}/v3/openapi/workspaces/{workspace_name}/{service_type}/{service_id}`

### Synchronous Calls
Used for: document splitting, TVF execution, data source listing, bulk ingestion.  
- Send a single POST/GET request with parameters in JSON body or query string.  
- Receive immediate JSON response with results.  
- Example: Document chunking returns `chunks[]` directly in the response.

### Async Task Pattern
Used for: document parsing (PDF/DOC analysis).  
1. **Submit Task**: POST to `/async/` endpoint with document URL or Base64 content.  
2. **Receive Task ID**: Response includes `task_id` for tracking.  
3. **Poll Status**: Repeatedly GET `/async/task-status?task_id={id}` until status is `SUCCESS` or `FAILED`.  
4. **Retrieve Result**: Final response contains parsed `content`, `page_num`, and usage metrics.  
- **Polling Interval**: Recommended 5-second delay between status checks.  
- **Timeout**: Tasks may expire after several hours; implement retry logic for long-running jobs.

## Parameter Reference

### Split Document / Document Split

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| document.content | string | true | - | max 8 MB | The text to split. Escape special JSON characters (`\`, `"`, etc.). |
| document.content_type | string | false | text | one of: html, markdown, text | Format of the input content. |
| strategy.max_chunk_size | int | false | 300 | ≥1 | Maximum chunk size in tokens (using `ops-text-embedding-001` tokenizer). |
| strategy.need_sentence | boolean | false | false | - | Return sentence-level chunks (doubles token usage). |
| strategy.compute_type | string | false | token | only 'token' | Method for measuring chunk length. |

### Parse Document / Create Document Analyze Task

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| document.url | string | false* | - | - | Publicly downloadable document URL (HTTP/HTTPS). |
| document.content | string | false* | - | max 8 MB | Base64-encoded document content. |
| document.file_type | string | false | inferred | one of: txt, pdf, html, doc, docx, ppt, pptx | Document format (required if not inferable from filename). |
| output.image_storage | string | false | base64 | one of: base64, url | How images are stored in output (`url` links expire in 3 days). |
| strategy.enable_semantic | boolean | false | false | - | Enable semantic hierarchy extraction (increases latency and cost). |

*Either `document.url` or `document.content` must be provided.

### Process Data (TVFs)

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| sql | string | true | - | Must be enclosed in `()` | SQL statement to process (e.g., `(SELECT brand, size FROM phone)`). |
| sort_key | string | true | ascending | Use `+`/`-` for order | Field(s) to sort by (e.g., `-size` for descending). |
| reserved_count | integer | true | - | Negative = all records | Number of records to reserve per group. |
| unpack_keys | string | true | - | Comma-separated | Fields to unpack (for `unpackMultiValue` TVF). |

### Push Documents Bulk

| Parameter | Type | Required | Default | Constraints | Description |
|-----------|------|----------|---------|-------------|-------------|
| cmd | string | true | - | one of: ADD, UPDATE, DELETE | Operation to perform on the document. |
| fields | object | true | - | Must include primary key | Document fields (e.g., `{ "id": 1, "describe": "text" }`). |
| timestamp | integer | false | - | Unix ms | Custom update order for same primary key. |

## Code Examples

### Document Chunking - Python - All Regions

```python
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest

if __name__ == '__main__':
    config = Config(
        bearer_token='<your-api-key>',
        endpoint='<your-api-endpoint>',
        protocol='http')
    client = Client(config=config)

    request = GetDocumentSplitRequest().from_map({
        "document": {
            "content": "Your document text goes here.",
            "content_type": "text"
        },
        "strategy": {
            "max_chunk_size": 300,
            "need_sentence": False
        }
    })

    response = client.get_document_split("default", "ops-document-split-001", request)
    print(response)
```

### Parse Document (Async) - Python - All Regions

```python
import base64
import os
import time
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import (
    CreateDocumentAnalyzeTaskRequestDocument,
    CreateDocumentAnalyzeTaskRequestOutput,
    CreateDocumentAnalyzeTaskRequest,
    GetDocumentAnalyzeTaskStatusRequest
)

config = Config(
    bearer_token="<your-api-key>",
    endpoint="<your-api-endpoint>",
    protocol="http"
)
client = Client(config=config)

# Submit task with URL
document = CreateDocumentAnalyzeTaskRequestDocument(
    url="https://example.com/sample.pdf",
    file_type="pdf"
)
output = CreateDocumentAnalyzeTaskRequestOutput(image_storage="url")
request = CreateDocumentAnalyzeTaskRequest(document=document, output=output)
response = client.create_document_analyze_task("default", "ops-document-analyze-001", request)
task_id = response.body.result.task_id

# Poll until completion
request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
while True:
    response = client.get_document_analyze_task_status("default", "ops-document-analyze-001", request)
    status = response.body.result.status
    if status == "PENDING":
        time.sleep(5)
    elif status == "SUCCESS":
        print("Content:", response.body.result.data.content[:1000])
        break
    else:
        print("Error:", response.body.result)
        break
```

### Bulk Document Push - Go - All Regions

```go
package main

import (
    "fmt"
    "os"
    util "github.com/alibabacloud-go/tea-utils/service"
    "github.com/alibabacloud-go/tea/tea"
    opensearch "main/client"
)

func main() {
    config := &opensearch.Config{
        Endpoint:         tea.String("<Endpoint>"),
        AccessKeyId:     tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")),
        AccessKeySecret: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")),
    }

    client, _ := opensearch.NewClient(config)

    document1st := map[string]interface{}{
        "cmd": "ADD",
        "fields": map[string]interface{}{
            "id":       1,
            "describe": "123456",
        },
    }

    requestBody := []interface{}{document1st}

    appName := "<appName>"
    tableName := "<tableName>"

    response, _ := client.Request(
        tea.String("POST"),
        tea.String("/v3/openapi/apps/"+appName+"/"+tableName+"/actions/bulk"),
        nil, nil, requestBody, &util.RuntimeOptions{})
    
    fmt.Println(response)
}
```

### Execute TVF (SQL) - SQL - All Regions

```sql
-- Rank products by size within each brand
select * from table (
  rankTvf('brand','-size','1', (SELECT brand, size FROM phone))
) 
order by brand 
limit 100;

-- Unpack multi-value fields (e.g., comma-separated tags)
select * from table (
  unpackMultiValue('tags', (SELECT id, tags FROM products))
);
```

### List ODPS Projects - Bash - All Regions

```bash
curl -X GET 'https://openapi.aliyun.com/v4/openapi/assist/data-sources/odps/projects' \
-H 'Authorization: Bearer <your-access-token>' \
-H 'Content-Type: application/json'
```

### Configure RDS Data Source - JSON - All Regions

```json
{
  "tableName": "news",
  "type": "rds",
  "fields": [
    { "id": "id" },
    { "caption": "caption" },
    { "content": "content" }
  ],
  "keyField": "id",
  "parameters": {
    "instanceId": "rm-abc",
    "dbName": "my_db",
    "dbTableName": "my_table",
    "dbUser": "my_user",
    "dbPassword": "my-password",
    "autoSync": true
  }
}
```

### Parse Local File (Base64) - Python - All Regions

```python
import base64
import os
from alibabacloud_searchplat20240529.models import (
    CreateDocumentAnalyzeTaskRequestDocument,
    CreateDocumentAnalyzeTaskRequestOutput,
    CreateDocumentAnalyzeTaskRequest
)

file_path = "path/to/sample.pdf"
document = CreateDocumentAnalyzeTaskRequestDocument(
    content=base64.b64encode(open(file_path, "rb").read()).decode(),
    file_name=os.path.basename(file_path)
)
output = CreateDocumentAnalyzeTaskRequestOutput(image_storage="url")
request = CreateDocumentAnalyzeTaskRequest(document=document, output=output)

response = client.create_document_analyze_task("default", "ops-document-analyze-001", request)
print("Task ID:", response.body.result.task_id)
```

## Response Format

```json
{
  "request_id": "47EA146B-****-448C-A1D5-50B89D7EA434",
  "latency": 161,
  "usage": {
    "token_count": 800
  },
  "result": {
    "chunks": [
      {
        "content": "Product benefits\nIndustry algorithm edition\n...",
        "meta": {
          "parent_id": "dee776dda3ff4b078bccf989a6bd****",
          "id": "27eea7c6b2874cb7a5bf6c71afbf****",
          "type": "text"
        }
      }
    ],
    "nodes": [...],
    "rich_texts": []
  }
}
```

**Key Fields**:  
- `request_id` — Unique identifier for the API request  
- `latency` — Processing time in milliseconds  
- `usage.token_count` — Number of tokens consumed (for billing)  
- `result.chunks[].content` — Extracted text chunk  
- `result.chunks[].meta.id` — Unique ID for the chunk  
- `result.rich_texts[].content` — Non-text elements (e.g., tables)  

## Error Handling

| Error Code | Description | Recommended Action |
|------------|-------------|-------------------|
| InvalidParameter | Request contains invalid parameters (e.g., JSON parse error, unsupported file type). | Validate input encoding, escape special characters, and ensure parameters match constraints. |
| BadRequest.TaskNotExist | Task ID is invalid or expired. | Verify the task ID and resubmit if necessary. |
| 413 | Request entity too large (>8 MB). | Split the document into smaller segments before calling the API. |
| 400 | Bad Request (missing/invalid parameters). | Check required fields (`content_type`, `max_chunk_size`) and value formats. |
| 401 | Authentication failed. | Verify API key validity and correct header format (`Authorization: Bearer ...`). |
| 429 | Rate limit exceeded. | Implement exponential backoff or request quota increase via ticket. |

### Rate Limits & Retry
- **Document Splitting**: 2 QPS (combined for Alibaba Cloud account and RAM users)  
- **Document Parsing**: 10 QPS for `ops-document-analyze-001` (submit ticket to increase)  
- **Bulk Ingestion**: 100 QPS per AccessKey  
- **Retry Strategy**: For 429 errors, use exponential backoff (start with 1s delay, double each retry). For async tasks, poll status every 5 seconds.  

## Environment Requirements
- **Python SDK**: `pip install alibabacloud_searchplat20240529>=1.0.0 alibabacloud_tea_openapi>=0.3.0`  
- **Java SDK**: `com.aliyun.searchplat20240529>=1.0.0`  
- **Go SDK**: `github.com/alibabacloud-go/tea/tea >= 1.0.0`  
- **Environment Variables**:  
  - `export DASHSCOPE_API_KEY=your_api_key` (for document processing)  
  - `export ALIBABA_CLOUD_ACCESS_KEY_ID=your_id` and `export ALIBABA_CLOUD_ACCESS_KEY_SECRET=your_secret` (for bulk ingestion)  

## FAQ

Q: How do I handle large documents (>8 MB) for parsing or splitting?  
A: Split the document into smaller segments (e.g., by page or section) before sending API requests. The 8 MB limit applies to the entire request body.

Q: Why am I getting "Invalid UTF-8 start byte" errors?  
A: Ensure your document content is properly encoded in UTF-8 and that special JSON characters (`\`, `"`, etc.) are escaped. Most JSON libraries handle escaping automatically.

Q: How do I choose between synchronous and asynchronous parsing?  
A: Use synchronous for small documents (<1 MB) needing immediate results. Use asynchronous for larger files (PDFs, PPTs) to avoid timeouts and manage long-running tasks.

Q: What’s the difference between `split_document` and `document_split`?  
A: They are functionally identical—both split text into chunks. The naming variation comes from different SDK versions; use the one matching your SDK.

Q: How are tokens counted for billing in document splitting?  
A: Tokens are calculated using the `ops-text-embedding-001` tokenizer. Enabling `need_sentence=true` doubles token usage as it generates additional sentence-level chunks.

## Pricing & Billing

### Billing Model
- **Document Splitting**: Billed per token (input content length)  
- **Document Parsing**: Billed per token (with additional charges for semantic hierarchy extraction)  
- **Data Ingestion/TVFs**: Billed per request  

### Price Reference

| Tier | Input Price | Output Price |
|------|-------------|-------------|
| ops-document-split-001 | 0.0001 /tokens | 0.0001 /tokens |
| ops-document-analyze-001 | 0.002 /tokens | 0.002 /tokens |
| ops-document-analyze-002 | 0.003 /tokens | 0.003 /tokens |

### Free Tier
- Document Splitting: No free tier  
- Document Parsing: 100 tokens free monthly (varies by service) 
- Bulk Ingestion: 10,000 free requests monthly  

### Usage Limits
- **QPS**: 2–100 depending on service (see Rate Limits section)  
- **Request Size**: Max 8 MB per request  
- **Concurrent Tasks**: Max 10 async tasks per workspace  

### Billing Notes
- Semantic hierarchy extraction (`enable_semantic=true`) incurs extra charges based on `semantic_token_count` in the response.  
- Async tasks are billed only upon successful completion.  
- Token counts for splitting use the `ops-text-embedding-001` tokenizer—verify chunk sizes with this model.