> ## Documentation Index
> Fetch the complete documentation index at: https://www.aptible.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Hosted Elasticsearch

> Stream Aptible activity events to hosted Elasticsearch

## Elastic Cloud Setup

### Prerequisites

* A hosted Elasticsearch deployment on [Elastic Cloud](https://www.elastic.co/cloud)
* A [long-term activity access token](/core-concepts/security-compliance/access-permissions#activity-log-viewer) (coming soon!)

### Step 1: Identify Your Data Stream

Imported activities will be written to a named Data Stream. If you don't already have one you want to use, we'll assume you'll use one called `aptible_activity`, which will be created automatically during step 3.

### Step 2: Create an Ingest Pipeline

In Kibana, navigate to **<Tooltip tip="In some versions of Kibana, Ingest Pipelines may be found under Data Management">Stack Management</Tooltip> > Ingest Pipelines** and create a new pipeline named `aptible_activity_pipeline` with the following three processors. You can import this JSON when creating it:

```json theme={null}
{
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "aptible"
      }
    },
    {
      "date": {
        "formats": ["ISO8601"],
        "field": "aptible.occurred_at",
        "target_field": "@timestamp"
      }
    },
    {
      "set": {
        "field": "_id",
        "value": "aptible_activity_{{aptible.id}}"
      }
    }
  ]
}
```

Each processor serves a specific purpose:

* **`json`** — Parses the raw `message` field into individual fields under `aptible.*` (e.g. `aptible.primary_resource_id`, `aptible.status`, `aptible.summary`), making them searchable and filterable. If you want to discard the original `message` field after parsing, add `"remove_if_successful": true`.
* **`date`** — Sets `@timestamp` to the event's `occurred_at` time so that events are indexed by when they happened, not when they were ingested.
* **`set _id`** — Assigns a predictable document ID based on the Aptible activity ID. This prevents duplicate events if the same activity is fetched more than once.

### Step 3: Set Up the Custom API Integration

In Kibana, navigate to **Integrations** and search for **Custom API**. Add a new **Custom API** integration with the following settings:

#### Basic Settings

| Setting                 | Value                                                                        |
| ----------------------- | ---------------------------------------------------------------------------- |
| **Dataset name**        | `aptible_activity` (the index identified/created in step #1)                 |
| **Ingest Pipeline**     | `aptible_activity_pipeline` (the name of the pipeline created in step #2)    |
| **Request URL**         | `https://activity.aptible.com/activities?only_completed=true&read_only=true` |
| **Request Interval**    | `1m`                                                                         |
| **Request HTTP Method** | `GET`                                                                        |

<Note>
  See [Activity Details](/core-concepts/observability/activity/activity-details) for the full list of event types. Certain types of events can be omitted by setting `read_only=false` in the URL.

  The `only_completed=true` parameter omits activity operations that are still in a `queued` or `running` state, so you only ingest events once they've reached a final state (`succeeded` or `failed`). If you set  `only_completed=false`, in-progress operations will be ingested immediately and the records will be later updated when it is complete.
</Note>

#### Request Transforms

Add the following request transforms, and *be sure to replace the placeholder with your actual token*:

```yaml theme={null}
- set:
    target: header.Authorization
    value: "Bearer APTIBLE_TOKEN_PLACEHOLDER"
- set:
    target: url.params.updated_after
    value: "[[.cursor.updated_after]]"
    default: "2025-11-01T00:00:00Z"
```

#### Response Split

Configure response splitting so each activity event becomes its own document:

```yaml theme={null}
target: body.activities
```

#### Response Pagination

Configure pagination to follow `next` links in the Activity API response:

```yaml theme={null}
- set:
    target: url.value
    value: '[[if (index .last_response.body._links "next")]][[.last_response.body._links.next.href]][[end]]'
    fail_on_template_error: true
```

#### Custom request cursor

Configure the cursor to track your position across polling intervals. `updated_after` is the integration's saved checkpoint — it determines where the next poll resumes. This ensures each request only fetches events updated since the last successful poll:

```yaml theme={null}
updated_after:
  value: "[[.last_event.updated_at]]"
```

#### Choose an agent policy

In the **Where to add this integration?** section, assign the integration to an agent policy. An agent policy manages a group of integrations deployed to a set of Elastic Agents. You can select an existing policy or click **Create a new agent policy** to create a new one.

This integration only needs to run on a single agent — it polls the Activity API on the configured interval and ingests all events centrally.

<Note>
  The Elastic Agent must have persistent storage so that cursor state survives restarts. If the agent loses its state (e.g. it runs on a container with an ephemeral filesystem), the cursor resets and the next poll starts from the beginning — re-fetching all historical events. The ingest pipeline's `_id` de-duplication prevents duplicate documents, but the agent will perform unnecessary work re-ingesting events it has already processed.
</Note>
