Ambient Agents¶
ADK supports ambient agents, autonomous agents that process data, monitor events, and respond asynchronously without human intervention. Use ambient agents to:
- React to cloud events. Process a file when it's uploaded to Cloud Storage, respond to database changes, or handle audit log entries.
- Process messages from a queue. Analyze incoming support tickets, moderate content, classify documents, or run QA as items arrive.
- Run on a schedule. Generate daily reports, run periodic monitoring checks, or process batch jobs at regular intervals.
- Monitor infrastructure. React to a continuous stream of events across your infrastructure and act on changes autonomously.
Getting results from ambient agents¶
Because ambient agents run without human interaction, you need to route their outputs to a notification channel. Common patterns include:
- Structured logging. Write JSON logs and configure Cloud Monitoring alerts to notify via email, Slack, or PagerDuty.
- Pub/Sub. Publish results to a topic for downstream services to consume.
- Application Integration. Route agent outputs to email, Jira, or other systems.
How to build ambient agents¶
ADK provides two approaches:
/run |
Trigger endpoints | |
|---|---|---|
| Event sources | Any (Pub/Sub, webhooks, cron, custom services) | Cloud Pub/Sub, Eventarc (Standard and Advanced) |
| Payload parsing | You handle it | Automatic (Base64 decoding, CloudEvent parsing) |
| Session creation | Enable --auto_create_session |
Automatic (one per event) |
| Session storage | Your configured SessionService |
Your configured SessionService |
| Concurrency control | You handle it | Built-in semaphore with configurable limit |
| Retry logic | You handle it | Exponential backoff with jitter for transient errors |
| Best for | Custom integrations, non-GCP sources | GCP-native event-driven workloads |
Using /run¶
Use the /run endpoint when you need full control over
the integration or are working with non-GCP event sources. Enable
--auto_create_session so that sessions are created automatically, then
connect any HTTP client to call /run when events arrive.
This pattern works with any event source that can make an HTTP request.
Example: Processing incoming webhooks
The following Cloud Run function receives a webhook from an external service (for example, GitHub) and forwards it to your agent:
import json
import uuid
import functions_framework
import requests
AGENT_URL = "https://my-agent-service-xxxxx.run.app"
@functions_framework.http
def handle_webhook(request):
"""Cloud Run function that receives webhooks and forwards to the agent."""
payload = request.get_json(silent=True) or {}
requests.post(
f"{AGENT_URL}/apps/my_agent/run",
json={
"app_name": "my_agent",
"user_id": payload.get("account", "webhook-caller"),
"session_id": str(uuid.uuid4()),
"new_message": {
"role": "user",
"parts": [{"text": json.dumps(payload)}],
},
},
)
return ("ok", 200)
Example: Send an event with curl
Using trigger endpoints¶
Use trigger endpoints when your event sources are Pub/Sub or Eventarc and you want ADK to handle payload parsing, session creation, concurrency, and retries.
How events are processed¶
Pub/Sub and Eventarc deliver events to your agent as HTTP POST requests. When a trigger endpoint receives an event, it:
- Parses the request according to the source format (Pub/Sub push message or CloudEvent).
- Decodes the payload. Base64-encoded message data is decoded and, if possible, parsed as JSON.
- Creates a session automatically with a generated UUID. Unlike the
/runendpoint, you do not need to enable--auto_create_session— trigger endpoints always create a new session per event. - Runs your agent with the decoded event as a user message.
- Returns a status code. A
200response tells Pub/Sub or Eventarc that the event was processed successfully. A500response signals a failure, and the event source retries delivery based on its retry policy.
Supported sources¶
| Source | Endpoint | Description |
|---|---|---|
| Pub/Sub | /apps/{app_name}/trigger/pubsub |
Receives messages from a Pub/Sub push subscription. |
| Eventarc | /apps/{app_name}/trigger/eventarc |
Receives CloudEvents delivered by Eventarc (Standard or Advanced), supporting both structured and binary content modes. |
Example agent¶
The following agent processes events from a trigger endpoint. It uses a
parse_event tool to extract the event data and attributes, then analyzes
the contents.
Agent code (event_processing_agent/agent.py)
import json
from google.adk.agents import LlmAgent
def parse_event(raw_event: str) -> dict:
"""Parse and extract structured data from a trigger event.
Trigger endpoints deliver events as a JSON string with 'data' and
'attributes' fields. This tool extracts those fields so the agent
can reason about the event contents.
"""
try:
event = json.loads(raw_event)
except json.JSONDecodeError as e:
return {"error": f"Failed to parse event JSON: {e}"}
return {
"data": event.get("data"),
"attributes": event.get("attributes", {}),
}
root_agent = LlmAgent(
model="gemini-flash-latest",
name="event_processor",
instruction="""You are an event-processing agent that handles incoming
events from Pub/Sub and Eventarc triggers.
When you receive an event:
1. Use the `parse_event` tool to extract the event data and attributes.
2. Analyze the event contents and determine what action to take.
3. Summarize what you found and what action you would recommend.
Be concise and structured in your responses.""",
tools=[parse_event],
)
Enable triggers¶
Trigger endpoints are disabled by default. Enable them with the
--trigger_sources flag:
For production deployments, you can enable triggers programmatically in a custom FastAPI entry point:
Deployment entry point (main.py)
import os
import uvicorn
from google.adk.cli.fast_api import get_fast_api_app
AGENT_DIR = os.path.dirname(os.path.abspath(__file__))
app = get_fast_api_app(
agents_dir=AGENT_DIR,
web=False,
trigger_sources=["pubsub", "eventarc"],
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8000)))
Try it locally¶
1. Start the server with triggers enabled:
2. Send a test event:
curl -X POST http://localhost:8000/apps/event_processing_agent/trigger/pubsub \
-H "Content-Type: application/json" \
-d '{
"message": {
"data": "eyJvcmRlcl9pZCI6ICIxMjM0IiwgInN0YXR1cyI6ICJuZXcifQ==",
"attributes": {"source": "orders-service"}
},
"subscription": "projects/my-project/subscriptions/orders-sub"
}'
The Base64 value decodes to {"order_id": "1234", "status": "new"}.
A successful response:
Trigger sources¶
Parameter mapping¶
The /run endpoint requires you to provide app_name, user_id, and
session_id. Trigger endpoints derive these automatically:
| Parameter | Source |
|---|---|
app_name |
Extracted from the URL path (/apps/{app_name}/trigger/...) |
session_id |
Auto-generated UUID per event |
user_id |
Pub/Sub: the subscription field. Eventarc: the source or ce-source header. |
Message format¶
All trigger endpoints normalize the incoming event into a consistent JSON structure before passing it to your agent as the user message:
data: The decoded event payload. If the original data is JSON, it is parsed into a structured object. Otherwise, it is passed as a plain string.attributes: Key-value metadata from the event source (for example, Pub/Sub message attributes or CloudEvents headers likece-type,ce-source).
Your agent receives this JSON string as the input message and can parse it to extract the data and attributes.
Pub/Sub¶
The Pub/Sub trigger endpoint processes messages from a Pub/Sub push subscription. Use it when your applications or services publish messages to a topic, for example:
- A support portal publishes incoming tickets for triage and routing.
- A content pipeline sends documents for classification or moderation.
- A monitoring service publishes alerts for automated analysis.
Request format¶
Pub/Sub push subscriptions send requests in this format:
{
"message": {
"data": "eyJvcmRlcl9pZCI6ICIxMjM0IiwgInN0YXR1cyI6ICJuZXcifQ==",
"attributes": {"source": "orders-service"},
"messageId": "123456789",
"publishTime": "2026-04-08T12:00:00Z"
},
"subscription": "projects/my-project/subscriptions/my-sub"
}
The data field is Base64-encoded. The trigger endpoint decodes it
automatically.
Response¶
| HTTP Status | Meaning |
|---|---|
| 200 | Event processed successfully. Pub/Sub acknowledges the message. |
| 400 | Invalid request (malformed Base64 encoding). Message is not retried. |
| 500 | Processing failed (transient or non-transient agent errors). Pub/Sub retries delivery based on its retry policy. Configure a dead-letter queue to catch messages that fail repeatedly. |
Eventarc¶
The Eventarc trigger endpoint processes CloudEvents delivered by Eventarc, both Standard and Advanced editions. Use it to react to events across Google Cloud, for example:
- A file is uploaded to Cloud Storage (classify, summarize, or extract data from documents).
- A record is written to BigQuery (run anomaly detection or generate alerts).
- An Audit Log entry is created (flag policy violations or suspicious activity).
Both content modes are supported:
- Binary content mode (Eventarc default): CloudEvents attributes are sent
as
ce-*HTTP headers, and the body contains the event data (typically a Pub/Sub message wrapper). - Structured content mode: All CloudEvents attributes and data are in the JSON body.
Test with curl (structured mode)
curl -X POST http://localhost:8000/apps/my_agent/trigger/eventarc \
-H "Content-Type: application/json" \
-d '{
"specversion": "1.0",
"type": "google.cloud.storage.object.v1.finalized",
"source": "//storage.googleapis.com/projects/my-project",
"id": "event-123",
"data": {
"bucket": "my-bucket",
"name": "uploads/document.pdf"
}
}'
Test with curl (binary mode)
curl -X POST http://localhost:8000/apps/my_agent/trigger/eventarc \
-H "Content-Type: application/json" \
-H "ce-type: google.cloud.storage.object.v1.finalized" \
-H "ce-source: //storage.googleapis.com/projects/my-project" \
-H "ce-id: event-456" \
-H "ce-specversion: 1.0" \
-d '{
"message": {
"data": "eyJidWNrZXQiOiAibXktYnVja2V0IiwgIm5hbWUiOiAiZG9jLnBkZiJ9",
"attributes": {"eventType": "OBJECT_FINALIZE"}
},
"subscription": "projects/my-project/subscriptions/eventarc-sub"
}'
Response¶
| HTTP Status | Meaning |
|---|---|
| 200 | Event processed successfully. Eventarc acknowledges delivery. |
| 500 | Processing failed. Eventarc retries delivery based on its retry policy. |
Configuration¶
Concurrency control¶
Trigger endpoints use a semaphore to limit the number of concurrent agent invocations. This prevents your agent from exceeding your LLM model quota during bursts of events.
| Setting | Default | Environment Variable |
|---|---|---|
| Max concurrent invocations | 10 | ADK_TRIGGER_MAX_CONCURRENT |
When the concurrency limit is reached, incoming requests are queued and processed as slots become available. Concurrency control is per process. If you deploy multiple Cloud Run instances, each instance maintains its own independent semaphore.
Automatic retry with backoff¶
Trigger endpoints include built-in retry logic for transient errors such as
429 RESOURCE_EXHAUSTED responses. When a transient error is detected, the
request is retried with exponential backoff and jitter.
| Setting | Default | Environment Variable |
|---|---|---|
| Max retry attempts | 3 | ADK_TRIGGER_MAX_RETRIES |
| Base backoff delay | 1.0s | ADK_TRIGGER_RETRY_BASE_DELAY |
| Max backoff delay | 30.0s | ADK_TRIGGER_RETRY_MAX_DELAY |
If all retries are exhausted, the endpoint returns HTTP 500, signaling Pub/Sub or Eventarc to retry delivery at a higher level. Non-transient errors fail immediately without retries.
Error handling and disaster recovery¶
Disaster recovery for trigger-based workloads is handled by the triggering service, not by ADK:
- If your agent crashes or returns an error, Pub/Sub or Eventarc does not receive an acknowledgement and automatically redelivers the message.
- After maximum retries are exhausted, unprocessed messages move to a dead-letter queue (DLQ) if configured.
- Each redelivery creates a new session. Trigger workloads are stateless by design.
Timeout considerations¶
All trigger endpoints process synchronously and wait for your agent to complete before returning a response. This is by design: keeping the HTTP request alive ensures that the hosting infrastructure does not terminate the process while your agent is still working. The synchronous response code (200 or 500) is what allows Pub/Sub and Eventarc to correctly acknowledge success or trigger a retry.
The maximum processing time is governed by the upstream service:
| Service | Max Timeout |
|---|---|
| Pub/Sub push | 10 minutes (ack deadline) |
| Eventarc | 10 minutes (Standard uses Pub/Sub as transport; Advanced delivers via pipeline) |
Trigger endpoints are designed for agents that complete within 10 minutes. This is suitable for processing individual events, running validations, classifying documents, and writing results to downstream services.
Long-running agents
Trigger endpoints are not suitable for agents that take more than 10 minutes to complete. For long-running workloads, use Pub/Sub pull subscriptions, Cloud Run Jobs, or a worker pool architecture instead.
Session lifecycle¶
Sessions follow the same pattern as all other ADK entry points. They are
created through your configured
SessionService. By default, ADK uses
InMemorySessionService, which makes trigger sessions ephemeral: created per
event and discarded after processing.
If you configure a persistent SessionService (for example, DatabaseSessionService),
trigger sessions are stored automatically. This can be useful for auditing,
debugging, and post-mortem analysis of event-driven workloads.
Deploy¶
The examples below use Cloud Run as the deployment target. Cloud Run is currently the recommended platform for deploying ambient agents with trigger endpoints.
Authentication and security
Trigger endpoints are standard HTTP routes within the ADK web server. Authentication and security are enforced at the deployment level, the same as any other ADK endpoint. When deployed with authentication enabled (recommended), all endpoints require valid credentials. GCP services authenticate using service account identities. See each service's documentation for details.
Deploy your agent to Cloud Run with triggers enabled using the
--trigger_sources flag:
adk deploy cloud_run \
--project=$GOOGLE_CLOUD_PROJECT \
--region=$GOOGLE_CLOUD_LOCATION \
--trigger_sources="pubsub,eventarc" \
path/to/your/agent
After deployment, connect the appropriate GCP infrastructure to your agent's trigger endpoint:
- Pub/Sub: Create a push subscription
pointing to
/apps/{app_name}/trigger/pubsub. - Eventarc: Create an Eventarc Standard trigger
or an Eventarc Advanced pipeline
routing to
/apps/{app_name}/trigger/eventarc. - Cloud Scheduler: Create a scheduler job that publishes to your Pub/Sub topic on a cron schedule.
See Deploy to Cloud Run for full deployment instructions.
What's next?¶
- Learn how to deploy your agent to Cloud Run
- Explore API server endpoints for interactive agent invocations
- Use the Pub/Sub toolset to give your agent the ability to publish and pull messages