Skip to content

Executor API

Canonical Definition - This document is the authoritative definition of the Executor interface

Executor is responsible for module execution, Context management, middleware scheduling, and observability.

1. Interface Overview

from typing import Any
from apcore import Registry, Context, Middleware


class Executor:
    """Module executor"""

    def __init__(
        self,
        registry: Registry,
        middlewares: list[Middleware] | None = None,
        acl: "ACL | None" = None
    ) -> None:
        """
        Initialize Executor

        Args:
            registry: Module registry
            middlewares: Middleware list (executed in order)
            acl: Access Control List
        """
        ...

    # ============ Execution Methods ============

    def call(
        self,
        module_id: str,
        inputs: dict[str, Any],
        context: Context | None = None
    ) -> dict[str, Any]:
        """
        Synchronously call module

        Args:
            module_id: Module ID
            inputs: Input parameters
            context: Call context (optional, auto-created)

        Returns:
            Module output

        Raises:
            ModuleNotFoundError: Module does not exist
            ValidationError: Input/output validation failed
            ACLDeniedError: Insufficient permissions
            CallDepthExceededError: Call chain depth exceeded
            CircularCallError: Circular call detected
            CallFrequencyExceededError: Same module call frequency exceeded
            ModuleError: Module execution error
        """
        ...

    async def call_async(
        self,
        module_id: str,
        inputs: dict[str, Any],
        context: Context | None = None
    ) -> dict[str, Any]:
        """
        Asynchronously call module

        Note: Modules only need to define one execute() method (def or async def),
        framework auto-detects and selects appropriate calling method. call_async is used
        to call modules in async context.
        """
        ...

    def validate(
        self,
        module_id: str,
        inputs: dict[str, Any]
    ) -> "ValidationResult":
        """Only validate inputs, don't execute"""
        ...

    # ============ Middleware Management ============

    def use(self, middleware: Middleware) -> "Executor":
        """Add middleware (Class-based)"""
        ...

    def use_before(self, callback: "Callable") -> "Executor":
        """Add before function middleware (Function-first API)"""
        ...

    def use_after(self, callback: "Callable") -> "Executor":
        """Add after function middleware (Function-first API)"""
        ...

    def remove(self, middleware: Middleware) -> bool:
        """Remove middleware"""
        ...

    # ============ Properties ============

    @property
    def registry(self) -> Registry:
        """Get Registry"""
        ...

    @property
    def middlewares(self) -> list[Middleware]:
        """Get middleware list"""
        ...

2. Initialization

2.1 Basic Initialization

from apcore import Registry, Executor

# Create Registry
registry = Registry(extensions_dir="./extensions")
registry.discover()

# Create Executor
executor = Executor(registry)

2.2 With Middleware

from apcore import Registry, Executor
from apcore.middleware import LoggingMiddleware, MetricsMiddleware

executor = Executor(
    registry=registry,
    middlewares=[
        LoggingMiddleware(),
        MetricsMiddleware()
    ]
)

2.3 With ACL

from apcore import Registry, Executor, ACL

acl = ACL.load("./acl/global_acl.yaml")
executor = Executor(registry=registry, acl=acl)

3. Calling Modules

3.1 Basic Call

result = executor.call(
    module_id="executor.email.send_email",
    inputs={
        "to": "[email protected]",
        "subject": "Hello",
        "body": "World"
    }
)

print(result)
# {"success": True, "message_id": "msg_123", "error": None}

3.2 With Context

from apcore import Context

# Create custom Context
context = Context(
    trace_id="custom-trace-123",
    identity=Identity(id="user_456", type="user", roles=["admin"]),
    data={"locale": "zh-CN"}
)

result = executor.call(
    module_id="executor.email.send_email",
    inputs={"to": "[email protected]", "subject": "Hi", "body": "Hello"},
    context=context
)

3.3 Async Call

import asyncio

async def main():
    result = await executor.call_async(
        module_id="executor.email.send_email",
        inputs={"to": "[email protected]", "subject": "Hi", "body": "Hello"}
    )
    print(result)

asyncio.run(main())

3.4 Batch Concurrent Calls

import asyncio

async def send_batch_emails(emails: list[dict]):
    tasks = [
        executor.call_async(
            module_id="executor.email.send_email",
            inputs=email
        )
        for email in emails
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# Usage
emails = [
    {"to": "[email protected]", "subject": "Hi", "body": "Hello 1"},
    {"to": "[email protected]", "subject": "Hi", "body": "Hello 2"},
    {"to": "[email protected]", "subject": "Hi", "body": "Hello 3"},
]
results = asyncio.run(send_batch_emails(emails))

4. Input Validation

4.1 Auto Validation

# Inputs are automatically validated against input_schema
try:
    result = executor.call(
        module_id="executor.email.send_email",
        inputs={
            "to": "invalid-email",  # Format error
            "subject": "Hi"
            # Missing required field body
        }
    )
except ValidationError as e:
    print(f"Validation failed: {e.errors}")
    # [
    #   {"field": "to", "message": "Invalid email format"},
    #   {"field": "body", "message": "Field required"}
    # ]

4.2 Pre-validation

# Only validate, don't execute
validation = executor.validate(
    module_id="executor.email.send_email",
    inputs={"to": "[email protected]", "subject": "Hi"}
)

if validation.valid:
    print("Inputs are valid")
else:
    print(f"Validation errors: {validation.errors}")

5. Error Handling

5.1 Error Types

from apcore import (
    ModuleNotFoundError,
    ValidationError,
    ACLDeniedError,
    CallDepthExceededError,
    CircularCallError,
    CallFrequencyExceededError,
    ModuleError
)

try:
    result = executor.call(
        module_id="executor.email.send_email",
        inputs={"to": "[email protected]", "subject": "Hi", "body": "Hello"}
    )
except ModuleNotFoundError as e:
    # Module does not exist
    print(f"Module not found: {e.module_id}")

except ValidationError as e:
    # Input/output validation failed
    print(f"Validation failed: {e.errors}")

except ACLDeniedError as e:
    # Insufficient permissions
    print(f"Access denied: {e.message}")
    print(f"Required: {e.required_permission}")
    print(f"Caller: {e.caller_id}")

except CallDepthExceededError as e:
    # Call chain depth exceeded
    print(f"Call depth exceeded: {e.current_depth}/{e.max_depth}")
    print(f"Call chain: {e.call_chain}")

except CircularCallError as e:
    # Circular call (strict cycle: A→B→A)
    print(f"Circular call detected: {e.module_id}")
    print(f"Call chain: {e.call_chain}")

except CallFrequencyExceededError as e:
    # Same module call frequency exceeded (A→B→C→B→C→B→C...)
    print(f"Call frequency exceeded: {e.module_id} called {e.count}/{e.max_repeat} times")
    print(f"Call chain: {e.call_chain}")

except ModuleError as e:
    # Module execution error
    print(f"Module error: {e.message}")
    print(f"Module ID: {e.module_id}")
    print(f"Trace ID: {e.trace_id}")

5.2 Error Context

try:
    result = executor.call(...)
except ModuleError as e:
    # Get complete error context
    print(f"Error: {e.message}")
    print(f"Module: {e.module_id}")
    print(f"Trace ID: {e.trace_id}")
    print(f"Call Chain: {e.call_chain}")
    print(f"Inputs: {e.inputs}")

    # Log to logging system
    logger.error(
        f"Module execution failed",
        extra={
            "trace_id": e.trace_id,
            "module_id": e.module_id,
            "call_chain": e.call_chain
        }
    )

6. Execution Flow

6.1 Complete Flow

executor.call(module_id, inputs, context)
    ├─ 1. Create/validate Context
    │      └─ If context is None, auto-create
    ├─ 2. Call chain safety checks
    │      ├─ 2a. Depth check: len(call_chain) >= 32 → throw CallDepthExceededError
    │      ├─ 2b. Cycle detection: module_id in call_chain → throw CircularCallError
    │      └─ 2c. Frequency detection: call_chain.count(module_id) >= max_repeat → throw CallFrequencyExceededError
    ├─ 3. Lookup module
    │      └─ registry.get(module_id)
    │      └─ If not exists, throw ModuleNotFoundError
    ├─ 4. ACL check
    │      └─ acl.check(caller_id, module_id, context)
    │      └─ If denied, throw ACLDeniedError
    ├─ 5. Input validation
    │      └─ Validate against input_schema
    │      └─ If failed, throw ValidationError
    ├─ 6. Middleware before
    │      └─ middleware.before(module_id, inputs, context)
    ├─ 7. Execute module
    │      └─ module.execute(inputs, context)
    │      └─ If failed, call middleware on_error
    ├─ 8. Output validation
    │      └─ Validate against output_schema
    ├─ 9. Middleware after
    │      └─ middleware.after(module_id, inputs, output, context)
    └─ 10. Return result

6.2 Automatic Context Handling

When modules internally call other modules, Executor automatically handles Context:

# Internal module call
result = context.executor.call(
    module_id="executor.other_module",
    inputs={...},
    context=context  # Pass current context
)

# Executor automatically:
# 1. Keeps trace_id unchanged
# 2. Updates caller_id to current module
# 3. Appends call_chain
# 4. Call chain safety checks (depth + cycle + frequency)

6.3 Execution State Machine

  ┌─────────┐
  │  idle   │
  └────┬────┘
       │ call()
  ┌──────────┐  depth/cycle/freq  ┌──────────────────────────┐
  │call_chain│───────────────────▶│ error: DEPTH_EXCEEDED    │
  │  guard   │                    │      / CIRCULAR_CALL     │
  └────┬─────┘                    │      / FREQUENCY_EXCEEDED│
       │ check passed             └──────────────────────────┘
  ┌─────────┐    module not exist ┌──────────────────┐
  │ resolve │───────────────────▶│ error: NOT_FOUND │
  └────┬────┘                    └──────────────────┘
       │ module found
  ┌─────────┐    permission denied ┌──────────────────┐
  │  acl    │────────────────────▶│ error: ACL_DENIED│
  └────┬────┘                     └──────────────────┘
       │ permission passed
  ┌──────────┐   validation failed ┌──────────────────────┐
  │ validate │────────────────────▶│ error: VALIDATION    │
  │  input   │                     └──────────────────────┘
  └────┬─────┘
       │ validation passed
  ┌──────────┐
  │ before   │──── middleware error ──▶ on_error chain
  │middleware│
  └────┬─────┘
  ┌──────────┐   execution error  ┌──────────────────────┐
  │ execute  │────────────────────▶│ on_error middleware  │
  │  module  │                     └──────────────────────┘
  └────┬─────┘
       │ success
  ┌──────────┐   validation failed ┌──────────────────────┐
  │ validate │────────────────────▶│ error: VALIDATION    │
  │  output  │                     └──────────────────────┘
  └────┬─────┘
  ┌──────────┐
  │  after   │
  │middleware│
  └────┬─────┘
  ┌──────────┐
  │ return   │
  │ result   │
  └──────────┘

6.4 Timeout Specification

Config Default Description
Module execution timeout 30000ms Can be overridden via resources.timeout
Global timeout 60000ms Total time including middleware and validation
ACL check timeout 1000ms Max time for ACL rule evaluation
  • After timeout MUST throw MODULE_TIMEOUT error
  • Timeout counting MUST start from the first before() middleware
  • Middleware execution time SHOULD count toward total timeout

6.5 Concurrent Execution Semantics

  • Same Executor instance MUST support concurrent calls by multiple threads/coroutines
  • Each call() MUST use independent Context copy (call_chain and caller_id)
  • context.data is shared by reference, concurrent calls SHOULD use different Context instances
  • Batch call_async() MAY execute concurrently, order not guaranteed

6.6 Edge Case Handling

Implementations MUST handle Executor edge cases per the following table:

Scenario Behavior Level
timeout = 0 Disable timeout limit, log WARN MUST
timeout is negative Throw GENERAL_INVALID_INPUT MUST
module_id is empty string "" Throw MODULE_NOT_FOUND MUST
inputs = null Treat as empty dict {}, continue validation MUST
context = null Create new Context (empty call_chain) MUST
Concurrent calls sharing same Context instance Behavior undefined (race condition), SHOULD log WARN SHOULD
call() during module unregister() If execution started, continue; if not started, throw MODULE_NOT_FOUND MUST
Call chain depth exceeds max_call_depth Throw CALL_DEPTH_EXCEEDED MUST

Concurrent safety notes: - Executor instance MUST be thread-safe, supporting multi-threaded concurrent calls - Each call() SHOULD use independent Context instance (created via derive()) - See PROTOCOL_SPEC §11.7 Concurrency Model Specification


7. Middleware

7.1 Adding Middleware

from apcore.middleware import LoggingMiddleware, MetricsMiddleware

# Method 1: Add during initialization (Class-based)
executor = Executor(
    registry=registry,
    middlewares=[LoggingMiddleware(), MetricsMiddleware()]
)

# Method 2: Chain adding (Class-based)
executor = Executor(registry=registry)
executor.use(LoggingMiddleware()).use(MetricsMiddleware())

# Method 3: Function-first API (lightweight scenarios)
executor.use_before(lambda module_id, inputs, ctx: print(f"Calling {module_id}"))
executor.use_after(lambda module_id, inputs, output, ctx: print(f"Done {module_id}"))

7.2 Execution Order

Middleware executes in onion model:

Request → MW1.before → MW2.before → Module → MW2.after → MW1.after → Response

On error:
Request → MW1.before → MW2.before → Module(error) → MW2.on_error → MW1.on_error

7.3 Removing Middleware

logging_mw = LoggingMiddleware()
executor.use(logging_mw)

# Remove
executor.remove(logging_mw)

8. ACL Integration

8.1 Configure ACL

# acl/global_acl.yaml
rules:
  # Allow all modules to call common.* modules
  - callers: ["*"]
    targets: ["common.*"]
    effect: allow

  # Only allow orchestrator.* to call executor.*
  - callers: ["orchestrator.*"]
    targets: ["executor.*"]
    effect: allow

  # Deny direct calls to internal.*
  - callers: ["*"]
    targets: ["internal.*"]
    effect: deny

  # Default allow
  - callers: ["*"]
    targets: ["*"]
    effect: allow

8.2 Using ACL

from apcore import Executor, ACL

acl = ACL.load("./acl/global_acl.yaml")
executor = Executor(registry=registry, acl=acl)

# Permissions automatically checked during calls
try:
    result = executor.call(
        module_id="internal.secret_module",
        inputs={...},
        context=context
    )
except ACLDeniedError as e:
    print(f"Access denied: {e.message}")

9. Observability

9.1 Built-in Metrics

Executor automatically collects the following metrics:

Metric Type Description
apcore_module_calls_total Counter Total module calls
apcore_module_duration_seconds Histogram Module execution time
apcore_module_errors_total Counter Total module errors
apcore_validation_errors_total Counter Total validation errors
apcore_acl_denied_total Counter Total ACL denials

9.2 Tracing

# All calls carry trace_id
result = executor.call(
    module_id="executor.email.send_email",
    inputs={...}
)

# Can get trace_id from middleware or logs
# Used to trace complete call chain in distributed systems

9.3 Logging

from apcore.middleware import LoggingMiddleware
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Add logging middleware
executor.use(LoggingMiddleware(
    log_inputs=True,   # Whether to log inputs
    log_outputs=True,  # Whether to log outputs
    log_errors=True    # Whether to log errors
))

Log output example:

[INFO] [trace:abc-123] Before: executor.email.send_email
[INFO] [trace:abc-123] Inputs: {"to": "[email protected]", ...}
[INFO] [trace:abc-123] After: executor.email.send_email (45ms)
[INFO] [trace:abc-123] Output: {"success": true, ...}

Security tip: Logging middleware SHOULD use context.redacted_inputs instead of raw inputs to avoid leaking sensitive fields marked with x-sensitive (like passwords, API Keys). See Context Object — redacted_inputs.


10. Complete Example

from apcore import Registry, Executor, Context, ACL
from apcore.middleware import LoggingMiddleware, MetricsMiddleware

# 1. Create Registry and discover modules
registry = Registry(extensions_dir="./extensions")
registry.discover()

# 2. Load ACL
acl = ACL.load("./acl/global_acl.yaml")

# 3. Create Executor
executor = Executor(
    registry=registry,
    middlewares=[
        LoggingMiddleware(),
        MetricsMiddleware()
    ],
    acl=acl
)

# 4. Create Context (with identity info)
context = Context.create(
    executor=executor,
    identity=Identity(id="user_123", type="user", roles=["admin"]),
    data={"locale": "zh-CN"}
)

# 5. Call module
try:
    result = executor.call(
        module_id="executor.email.send_email",
        inputs={
            "to": "[email protected]",
            "subject": "Hello",
            "body": "World"
        },
        context=context
    )
    print(f"Success: {result}")

except Exception as e:
    print(f"Error: {e}")

# 6. Async batch calls
async def send_batch():
    tasks = [
        executor.call_async(
            module_id="executor.email.send_email",
            inputs={"to": f"user{i}@example.com", "subject": "Hi", "body": "Hello"},
            context=context
        )
        for i in range(10)
    ]
    return await asyncio.gather(*tasks, return_exceptions=True)

import asyncio
results = asyncio.run(send_batch())
print(f"Sent {len([r for r in results if not isinstance(r, Exception)])} emails")

Next Steps