# es-document

Part of **ES**

<!-- intent-backlink:auto -->

> 💡 **Path Selection**: This skill is one implementation path for [Ingest and manage document data in Elasticsearch](../../intent/es-ingest-documents/SKILL.md). If you're unsure which path to take, check the routing skill first.

# Elasticsearch Data Ingestion and Management

## Capabilities Overview

| Sub-capability | Calling Mode | Description |
|--------|----------|------|
| Push Documents | Synchronous | Push documents to Elasticsearch indices. |
| Upload Documents | Synchronous | Upload and push documents to Elasticsearch indexes through various methods. |
| Stage and Submit Document Changes | Synchronous | Stage document modifications and submit them as a batch operation. |
| Commit Data | Synchronous | Commit staged data changes to make them persistent in the index. |
| Push Documents Bulk | Synchronous | Perform bulk document push operations for efficient data ingestion. |
| Push Documents to Table | Synchronous | Push documents specifically to table-based storage structures. |
| Manage Data Collection | Synchronous | List, describe, and remove data collections. |
| Manage Data Collection Tasks | Synchronous | Create, list, describe, and remove data collection tasks. |
| Configure Data Source | Synchronous | Set up and configure data sources for Elasticsearch integration. |
| List Data Source Generations | Synchronous | Get a list of data source generations and versions. |
| Modify Data Source Deploy | Synchronous | Update deployment configuration for data sources. |
| List Schemas | Synchronous | Retrieve available schemas for data synchronization operations. |
| Create Data Collection Task | Synchronous | Set up tasks for collecting data from various sources. |
| Manage Data Source Tables | Synchronous | List data source tables and their fields. |
| Validate Data Source | Synchronous | Validate data source connections and configurations. |

## API Calling Patterns

### Authentication
The primary authentication method is using an AccessKey pair via environment variables.

- **Header format**: `Authorization: Bearer $DASHSCOPE_API_KEY` (for some endpoints) OR credentials passed via SDK configuration
- **Environment variables**: 
  - `ALIBABA_CLOUD_ACCESS_KEY_ID`
  - `ALIBABA_CLOUD_ACCESS_KEY_SECRET`
- For RAM-based authentication, use STS tokens with the `security_token` parameter.

### Service Endpoint
APIs use region-specific endpoints following this pattern:

- **Pattern**: `https://opensearch-{region}.aliyuncs.com`
- **Common regions**: 
  - `cn-hangzhou` (China Hangzhou)
  - `cn-shanghai` (China Shanghai)
  - `ap-southeast-1` (Singapore)

For international regions, use: `https://opensearch-intl.aliyuncs.com`

### Synchronous API Pattern
All operations in this domain follow a synchronous calling pattern:

1. **Initialize client** with endpoint and credentials (from environment variables)
2. **Prepare request payload** with required parameters (app name, table name, document operations)
3. **Send HTTP request** (typically POST for document operations, GET/DELETE for management)
4. **Process immediate response** containing request ID and operation status
5. **Handle errors** based on HTTP status codes and error messages

For bulk document operations:
- Use the `/v3/openapi/apps/{app_name}/{table_name}/actions/bulk` endpoint
- Include `cmd` field in each document (`ADD`, `UPDATE`, `DELETE`)
- Optional `timestamp` field controls update ordering for documents with same primary key

For data collection management:
- Use `/v4/openapi/app-groups/{appGroupIdentity}/data-collections` endpoints
- Support pagination for listing operations (`pageNumber`, `pageSize` parameters)

## Parameter Reference

### Document Operations Parameters

| Parameter | Type | Required | Default | Constraints | Description |
|------|------|------|--------|------|------|
| app_name | string | true | | | The name of the OpenSearch application |
| table_name | string | true | | | The name of the table within the application |
| doc_content | array | true | | | List of document operations with cmd and fields |
| cmd | string | true | | one of: ADD, UPDATE, DELETE | Operation type for the document |
| fields | object | true | | | Document fields including primary key |
| timestamp | integer | false | | milliseconds since epoch | Controls update order for same primary key |

### Data Collection Parameters

| Parameter | Type | Required | Default | Constraints | Description |
|------|------|------|--------|------|------|
| appGroupIdentity | string | true | | | The name of the application |
| dataCollectionIdentity | string | true | | | The ID of the data collection |
| name | string | true | | | The name of the data collection task |
| type | string | true | | one of: server, web, app | Source type (only server supported) |
| dataCollectionType | string | true | | one of: BEHAVIOR, item_info, industry-specific | Type of data to collect |
| industryName | string | true | | one of: GENERAL, ecommerce | Industry type |
| pageNumber | integer | false | 1 | | Page number for pagination |
| pageSize | integer | false | 10 | | Number of entries per page |

### Data Source Configuration Parameters

| Parameter | Type | Required | Default | Constraints | Description |
|------|------|------|--------|------|------|
| instanceId | string | true | | | The instance ID |
| dataSourceName | string | true | | | The name of the data source |
| type | string | true | | one of: rds, odps, opensearch, polardb | Data source type |
| tableName | string | true | | | The name of the table |
| fields | array | true | | | Field mappings from source to destination |
| keyField | string | true | | | The primary key field |
| parameters | object | false | | | Data source specific parameters |

## Code Examples

### Bulk Document Push - Python - All Regions

```python
# -*- coding: utf-8 -*-
import time, os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client

class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
            connect_timeout=10000,
            read_timeout=10000,
            autoretry=False,
            ignore_ssl=False,
            max_idle_conns=50,
            max_attempts=3
        )
        self.header = {}

    def docBulk(self, app_name: str, table_name: str, doc_content: list) -> Dict[str, Any]:
        try:
            response = self.Clients._request(
                method="POST",
                pathname=f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk',
                query={},
                headers=self.header,
                body=doc_content,
                runtime=self.runtime
            )
            return response
        except Exception as e:
            print(e)

if __name__ == "__main__":
    # Endpoint of the OpenSearch API — do not include the http:// or https:// prefix
    endpoint = "<endpoint>"

    # Request protocol: HTTP or HTTPS
    endpoint_protocol = "HTTP"

    # Read credentials from environment variables
    access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
    access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    # Authentication type
    # access_key: authenticate with an AccessKey pair (default)
    # sts: authenticate with a Security Token Service (STS) token (required for RAM role assumption)
    auth_type = "sts"

    # STS token — required only when auth_type is "sts"
    # Obtain this token by calling the AssumeRole operation of Alibaba Cloud RAM
    security_token = "<security_token>"

    Configs = Config(
        endpoint=endpoint,
        access_key_id=access_key_id,
        access_key_secret=access_key_secret,
        security_token=security_token,  # Required only for STS authentication
        type=auth_type,
        protocol=endpoint_protocol
    )

    ops = opensearch(Configs)
    app_name = "app_name"
    table_name = "table_name"

    # --------------- Push documents ---------------

    # ADD: include timestamp (milliseconds) to control update order for documents
    # with the same primary key. Without timestamp, OpenSearch applies updates
    # in the order they arrive.
    document1 = {"cmd": "ADD", "timestamp": int(time.time() * 1000), "fields": {"id": "1", "title": "opensearch"}}
    document2 = {"cmd": "ADD", "fields": {"id": 2, "describe": "123456"}}

    # DELETE: only the primary key field is required
    deletedoc = {"cmd": "DELETE", "fields": {"id": 2}}

    # UPDATE: only the specified fields are updated; other fields are unchanged
    updatedoc = {"cmd": "UPDATE", "fields": {"id": 2, "describe": "6666", "title": "OpenSearch"}}

    documents = [document1, document2]
    res5 = ops.docBulk(app_name=app_name, table_name=table_name, doc_content=documents)
    print(res5)
```

### Commit Data Changes - Java - All Regions

```java
// Initialize the client
OpenSearchClient openSearchClient = new OpenSearchClient(config);
DocumentClient documentClient = new DocumentClient(openSearchClient);

// Stage a document for insertion
Map<String, Object> fields = new HashMap<>();
fields.put("id", "doc-001");
fields.put("title", "Getting started with Open Search");
fields.put("body", "Open Search enables full-text search at scale.");
documentClient.add(fields);

// Submit the staged changes
OpenSearchResult result = documentClient.commit("<your-app-name>", "<your-table-name>");
```

### List Data Collections - Bash - All Regions

```bash
curl -X GET 'https://opensearch.cn-hangzhou.aliyuncs.com/v4/openapi/app-groups/my_app_group_name/data-collections?pageNumber=1&pageSize=10' \
-H 'Authorization: Bearer $DASHSCOPE_API_KEY' \
-H 'Content-Type: application/json'
```

### Configure Data Source - JSON - All Regions

```json
{
  "tableName": "news",
  "type": "rds",
  "fields": [
    {
      "id": "id"
    },
    {
      "caption": "caption"
    },
    {
      "content": "content"
    },
    {
      "categoryId": "category_id"
    }
  ],
  "plugins": {
    "caption": {
      "name": "HTMLTagRemover",
      "parameters": {}
    },
    "content": {
      "name": "HTMLTagRemover",
      "parameters": {}
    }
  },
  "keyField": "id",
  "parameters": {
    "filter": "",
    "instanceId": "rm-abc",
    "dbName": "my_db",
    "dbTableName": "my_table",
    "dbUser": "my_user",
    "dbPassword": "my-password",
    "autoSync": true
  }
}
```

### Push Documents - Go - All Regions

```go
// This file is auto-generated, don't edit it. Thanks.
package main

import (
    "fmt"
    util "github.com/alibabacloud-go/tea-utils/service"
    "github.com/alibabacloud-go/tea/tea"
    opensearch "main/client"
)

func main() {
    // Initialize the client with your endpoint and credentials from environment variables.
    // endpoint: the OpenSearch API endpoint for your region.
    config := &opensearch.Config{
        Endpoint:        tea.String("<Endpoint>"),
        AccessKeyId:     tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")),
        AccessKeySecret: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")),
    }

    client, _clientErr := opensearch.NewClient(config)
    if _clientErr != nil {
        fmt.Println(_clientErr)
        return
    }

    // Build the document list.
    // document1st uses ADD without a timestamp — OpenSearch processes it in arrival order.
    document1st := map[string]interface{}{
        "cmd": "ADD",
        "fields": map[string]interface{}{
            "id":       1,       // primary key field
            "describe": "123456",
        },
    }

    // document2nd includes a timestamp to control update ordering for the same primary key.
    document2nd := map[string]interface{}{
        "cmd":       "ADD",
        "timestamp": 1401342874778,
        "fields": map[string]interface{}{
            "id":       2,
            "describe": "123456",
        },
    }

    requestBody := []interface{}{document1st}
    requestBody = append(requestBody, document2nd)

    // Configure request and connection pool settings.
    runTime := &util.RuntimeOptions{
        ConnectTimeout: tea.Int(5000),
        ReadTimeout:    tea.Int(10000),
        Autoretry:      tea.Bool(false),
        IgnoreSSL:      tea.Bool(false),
        MaxIdleConns:   tea.Int(50),
    }

    // appName: the name or version of the application to push documents to.
    // tableName: the target table. View available tables in the OpenSearch console.
    appName := "<appName>"
    tableName := "<tableName>"

    response, _requestErr := client.Request(
        tea.String("POST"),
        tea.String("/v3/openapi/apps/"+appName+"/"+tableName+"/actions/bulk"),
        nil,
        nil,
        requestBody,
        runTime)

    if _requestErr != nil {
        fmt.Println(_requestErr)
        return
    }

    fmt.Println(response)
}
```

### Create Data Collection Task - Python - All Regions

```python
import requests

url = "https://opensearch.cn-hangzhou.aliyuncs.com/v4/openapi/app-groups/os_function_test_v1/data-collections"
headers = {
    "Authorization": "Bearer $DASHSCOPE_API_KEY",
    "Content-Type": "application/json"
}
data = {
    "type": "server",
    "name": "os_function_test_v1"
}

response = requests.post(url, headers=headers, json=data)
print(response.json())
```

### Validate Data Source - JSON Response - All Regions

```json
{
  "requestId": "8FA2B338-AFDC-46B4-A132-B5487820C2BF",
  "result": [
    {
      "code": "SUCCEED",
      "message": "Validation succeed",
      "dataSource": {
        "tableName": "user_activity_decision",
        "type": "rds",
        "parameters": {}
      }
    }
  ]
}
```

### Push Documents - C# - China Region

```csharp
using System;
using System.Collections.Generic;
using AlibabaCloud.TeaUtil.Models;
using Tea;

namespace ConsoleApp2
{
    internal class Program
    {
        public static Dictionary<string, object> docBulk(
            Client opensearchClient,
            string appName,
            string tableName,
            List<object> docContent,
            Dictionary<string, string> header,
            RuntimeOptions runTime)
        {
            string pathName = "/v3/openapi/apps/" + appName + "/" + tableName + "/actions/bulk";
            try
            {
                return opensearchClient._request("POST", pathName, null, header, docContent, runTime);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
        }

        private static void Main(string[] args)
        {
            var runtime = new RuntimeOptions
            {
                ConnectTimeout = 5000,
                ReadTimeout = 10000,
                MaxAttempts = 0,
                Autoretry = false,
                IgnoreSSL = false,
                MaxIdleConns = 50
            };

            var config = new Config
            {
                Endpoint = "opensearch-cn-hangzhou.aliyuncs.com",
                Protocol = "HTTPS",
                AccessKeyId = System.Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                AccessKeySecret = System.Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
                Type = "access_key",
                SecurityToken = ""
            };

            var openSearch = new Client(config);

            var appName = "my-search-app";
            var tableName = "products";
            var header = new Dictionary<string, string>();

            var doc1Fields = new Dictionary<string, object>
            {
                { "id", "doc-001" },
                { "title", "Introduction to OpenSearch" },
                { "text", "OpenSearch is a managed search service." },
                { "cate", "documentation" }
            };
            var document1 = new Dictionary<string, object>
            {
                { "cmd", "ADD" },
                { "fields", doc1Fields }
            };

            var doc2Fields = new Dictionary<string, object>
            {
                { "id", "doc-002" },
                { "title", "Getting started with full-text search" },
                { "text", "This guide walks you through your first search query." },
                { "cate", "tutorial" }
            };
            var document2 = new Dictionary<string, object>
            {
                { "cmd", "ADD" },
                { "timestamp", DateTimeOffset.UtcNow.ToUnixTimeSeconds() },
                { "fields", doc2Fields }
            };

            var documents = new List<object> { document1, document2 };

            try
            {
                docBulk(openSearch, appName, tableName, documents, header, runtime);
                Console.WriteLine("Documents pushed successfully.");
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
        }
    }
```

## Response Format

```json
{
  "requestId": "72FAD77B-83F9-F393-BA8E-5834E2427BF8",
  "result": {
    "created": 1581065837,
    "dataCollectionType": "BEHAVIOR",
    "type": "server",
    "industryName": "GENERAL",
    "status": 2,
    "updated": 1581065904,
    "name": "os_function_test_v1",
    "sundialId": "1755",
    "id": "286"
  }
}
```

**Key Fields**:
- `requestId` — Unique identifier for the API request, useful for debugging and support
- `result.status` — Status code indicating operation success (2 = enabled, 0 = not enabled)
- `result.id` — Unique identifier of the created or retrieved resource
- `result.name` — Name of the resource (data collection, data source, etc.)
- `result.created` — Timestamp when the resource was created (Unix epoch time)
- `result.updated` — Timestamp when the resource was last updated (Unix epoch time)

## Error Handling

| Error Code (Code) | Description (Description) | Recommended Action (Recommended Action) |
|---------------|--------------------|-----------------------------|
| 400 | Bad request. The input data is invalid or malformed. Check the JSON structure and required parameters. | Validate request parameters and JSON structure against API documentation |
| 401 | Unauthorized – Invalid or missing AccessKey credentials. | Verify ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables |
| 403 | Forbidden – The RAM user does not have sufficient permissions. | Check RAM user policies and ensure AliyunServiceRoleForOpenSearch role is attached |
| 404 | Not found. The specified application or table does not exist. Verify the appName and tableName values. | Confirm application and table names in OpenSearch console |
| 500 | Internal server error. The server encountered an unexpected condition. Retry after a delay or contact support with trace information. | Implement retry logic with exponential backoff; contact support if persistent |

### Rate Limits & Retry
- **QPS limits**: Up to 100 requests per second per application/account
- **Connection pool**: Max 50 concurrent connections (MaxIdleConns=50)
- **Retry strategy**: Implement exponential backoff for 429/500 errors
- **Free tier**: Most operations include monthly free quotas (1000-10000 requests)

## Environment Requirements

- **Python**: `alibabacloud_tea_util>=1.0.0`, `alibabacloud_opensearch_util>=1.0.0`, `alibabacloud_credentials>=1.0.0`, Python>=3.7
- **Java**: `opensearch-sdk-java>=3.1`
- **PHP**: `opensearch-php-sdk>=1.0.0`, PHP>=7.4
- **Go**: `github.com/alibabacloud-go/tea-utils/service`, `github.com/alibabacloud-go/tea/tea`
- **C#**: `AlibabaCloud.TeaUtil>=0.1.5`, `AlibabaCloud.OpenSearchUtil>=1.0.2`, `Aliyun.Credentials>=1.2.1`, `Tea>=0.4.0`
- **Environment variables**: 
  - `export ALIBABA_CLOUD_ACCESS_KEY_ID=your_access_key_id`
  - `export ALIBABA_CLOUD_ACCESS_KEY_SECRET=your_access_key_secret`

## FAQ

Q: How do I handle document updates with the same primary key?
A: Use the `timestamp` field to control update ordering. Documents with higher timestamps will override those with lower timestamps. If no timestamp is provided, OpenSearch processes updates in arrival order.

Q: What's the difference between commit mode and bulk push?
A: Commit mode uses a client-side buffer where you stage multiple operations (add/update/delete) and flush them with a single commit call. Bulk push sends all operations in a single request without client-side buffering.

Q: Why am I getting a 403 Forbidden error when trying to push documents?
A: This typically indicates insufficient RAM user permissions. Ensure your RAM user has the AliyunServiceRoleForOpenSearch service-linked role attached with appropriate policies for OpenSearch operations.

Q: How can I validate my data source configuration before creating it?
A: Use the ValidateDataSources API endpoint to test connectivity and configuration validity before deployment. This returns detailed validation results including success/failure codes and messages.

Q: What authentication method should I use for production applications?
A: For production, use RAM user AccessKey pairs stored securely in environment variables. For enhanced security in distributed systems, consider using STS tokens with temporary credentials via the AssumeRole operation.

## Pricing & Billing

### Billing Model
Per-request billing model where each API call counts as one request regardless of the number of documents processed in bulk operations.

### Price Reference

| Tier/Specification | Input Price | Output Price | Other Fees |
|-----------|---------|---------|---------|
| standard | 0.0001 / | 0.0001 / |
| bulk_operation | 0.0001 / | |
| bulk_push | 0.0001 / | 0.0001 / |
| OpenSearch Pro | 0.0001 / | 0.0001 / |
| default | 0.001 / | 0.001 / |

### Free Tier
- Monthly free quotas ranging from 100 to 10,000 requests depending on the operation type
- Free tier resets monthly
- Bulk operations count as one request regardless of document count

### Usage Limits
- 100 QPS per application/account for most operations
- Single request size limits (e.g., 10KB for data collection creation)
- Connection pool limits (MaxIdleConns=50)

### Billing Notes
- Failed requests may still be counted toward quotas and billing
- Bulk operations are billed per request, not per document
- Minimum charge is 1 request per API call
- Free tier applies before paid usage begins