When one agent delegates work to another, the authority behind that delegation needs to be traceable, scoped, and revocable. AGP's capability delegation model provides exactly that — across any A2A framework, orchestration pattern, or agent runtime.
AGP is framework-agnostic. The pattern works the same whether you are using Google's A2A protocol, LangGraph multi-agent workflows, AutoGen group chats, CrewAI crews, or a custom orchestration layer you built yourself. The governance contract is between agents and the AGP server — not between AGP and any specific framework.
In a multi-agent system, authority typically flows from a human or a root orchestrator down through layers of worker agents. Without a governance layer, two things go wrong:
AGP solves all three. Sub-tokens carry explicit scope constraints. Revoking a parent cascades immediately to all children. Every action envelope references the root task, so the full delegation tree is visible in the audit ledger.
| A2A concept | AGP equivalent |
|---|---|
| Root task / user instruction | AGP Task — the governance root. All downstream actions reference it. |
| Agent delegating work | Orchestrator holds a capability token and issues sub-tokens to workers. |
| Delegated task assignment | Sub-token issuance — scoped narrower than the parent, with optional spend limits and expiry. |
| Worker agent completing a task | Worker submits an action envelope referencing its sub-token. AGP validates it before the action executes. |
| Cancelling a delegation | Revocation notice — invalidates the token and cascades to all descendants immediately. |
| Agent capability description | AGP Skill Manifest — registered in the registry, describes what an agent can do and under what constraints. |
The orchestrator registers a single AGP task at the start of the workflow. Every worker's action envelope will reference this task — it is the root of the entire accountability chain.
from agp import AGPClient orchestrator = AGPClient( "https://your-agp-server", client_id="orchestrator-agent", client_secret="...", ) # One root task for the entire multi-agent workflow task = orchestrator.registry.create_task( principal_id="orchestrator-agent", requested_outcome="Process Q3 vendor invoices and approve payments under $50k", risk_tier="high", sponsoring_entity="finance-team", jurisdictions=["EU"], ) task_id = task.task_id # shared with all workers
The orchestrator's own token defines the ceiling — no sub-token can exceed the scope or spend limit of its parent.
# Orchestrator's root capability — full scope, $500k ceiling root_token = orchestrator.registry.issue_capability( subject_agent="orchestrator-agent", issuer="finance-team", principal_id="orchestrator-agent", permitted_actions=["data_read", "approve_payment", "send_email"], constraints={"max_amount_usd": 500000}, )
Before handing off a task via A2A, the orchestrator issues a sub-token to the worker. The sub-token is scoped to exactly what that worker needs to do — nothing more. Pass the sub-token's ID to the worker as part of the A2A task message.
# Worker B: can only approve payments up to $50k worker_b_token = orchestrator.registry.issue_capability( subject_agent="worker-b", issuer="orchestrator-agent", # parent is the orchestrator, not the human principal_id="orchestrator-agent", permitted_actions=["approve_payment"], constraints={"max_amount_usd": 50000}, # narrower than parent's $500k expires_in_days=1, # short-lived — just for this workflow run ) # Worker A: read-only, no spend limit worker_a_token = orchestrator.registry.issue_capability( subject_agent="worker-a", issuer="orchestrator-agent", principal_id="orchestrator-agent", permitted_actions=["data_read"], ) # A2A task message to Worker B — include token ID and task ID a2a_message = { "task_id": task_id, # AGP root task "capability_token": worker_b_token.capability_id, "instruction": "Approve invoices from approved vendors", "constraints": {"max_amount_usd": 50000}, }
Each worker runs its own AGP session. When it's ready to act, it submits an action envelope using its sub-token. The AGP server validates the sub-token — including that it hasn't been revoked, that the action is within its permitted scope, and that the amount doesn't exceed the constraint — before the action is allowed to execute.
# Worker B — running in its own process / container worker = AGPClient( "https://your-agp-server", client_id="worker-b", client_secret="...", ) # Resume the root task (created by orchestrator) session = worker.resume_session(task_id) session.decide( agent_id="worker-b", selected_action="approve_payment", rationale="Vendor on approved list, invoice verified, amount within delegated limit", uncertainty_score=0.08, ) session.evaluate(verdict="allow") # Submit using the sub-token issued by the orchestrator receipt = session.execute( agent_id="worker-b", tool_id="approve_payment", operation={"vendor": "Acme Corp", "amount_usd": 32000}, capability_token_ref=worker_b_token_id, # sub-token, not orchestrator's token ) # AGP validates: sub-token is valid, action is "approve_payment" ✓, # amount $32k < $50k constraint ✓ → 201 Created
amount_usd: 200000, the AGP server would reject it — the sub-token's max_amount_usd: 50000 constraint is checked at submission time, not at issuance time.
Revocation cascades down the delegation tree. Revoking the orchestrator's root token immediately invalidates all worker sub-tokens — any in-flight action envelopes referencing them will be rejected.
# Revoke the orchestrator's root token — cascades to all workers immediately orchestrator.registry.create_revocation( token_id=root_token.capability_id, reason="Workflow cancelled — suspected prompt injection detected", ) # Worker B now tries to submit — rejected # AGPTokenRevokedError: parent token has been revoked
You can also revoke individual workers without affecting others:
# Revoke only Worker B — Worker A and C are unaffected orchestrator.registry.create_revocation( token_id=worker_b_token.capability_id, reason="Worker B produced unexpected outputs — isolating", )
Because all workers reference the same root task, the AGP event ledger contains the complete picture of the workflow — every decision, every action, every approval, in chronological order with hash-chaining between events.
# Replay the full audit trail for the entire multi-agent workflow events = orchestrator.execution.replay_task(task_id) for event in events["events"]: print(event["event_type"], event["agent_id"], event["created_at"]) # TASK_CREATED orchestrator-agent 2024-01-15T09:00:00Z # CAPABILITY_ISSUED orchestrator-agent 2024-01-15T09:00:01Z (root) # CAPABILITY_ISSUED worker-a 2024-01-15T09:00:02Z (sub-token) # CAPABILITY_ISSUED worker-b 2024-01-15T09:00:02Z (sub-token) # DECISION_RECORDED worker-a 2024-01-15T09:01:10Z # ACTION_SUBMITTED worker-a 2024-01-15T09:01:11Z data_read # DECISION_RECORDED worker-b 2024-01-15T09:02:30Z # ACTION_SUBMITTED worker-b 2024-01-15T09:02:31Z approve_payment $32k # ...
require_approval pauses and waits. The approval request can be routed to the human who initiated the root task — closing the accountability loop even in fully automated multi-agent systems.