apcore — Module Testing Guide¶
Comprehensive coverage of apcore module testing strategies: unit testing, Schema testing, integration testing, ACL testing and Mock techniques.
1. Overview¶
apcore's Schema-driven design is naturally suited for testing. Module inputs and outputs all have clear Schema definitions, behaviors are annotated through Annotations, and permissions are configured through ACL—all of which can be independently verified.
Testing Pyramid:
/ E2E \ ← End-to-end (minimal)
/ Integration \ ← Module interaction (moderate)
/ Schema Tests \ ← Schema validation (moderate)
/ Unit Tests \ ← Module logic (extensive)
──────────────────
Testing Tier Strategy:
| Tier | Target | Frequency | Speed |
|---|---|---|---|
| Unit Tests | Execute logic of individual modules | Every commit | < 1s |
| Schema Tests | Completeness and boundaries of Schema definitions | Every commit | < 1s |
| ACL Tests | Correctness of permission rules | Every commit | < 1s |
| Integration Tests | Module interactions and middleware chain | Every PR | < 10s |
| E2E Tests | Complete call chain | Pre-release | < 60s |
2. Unit Testing¶
2.1 Testing Individual Modules¶
The core of unit testing is to isolate and test the module's execute method.
# tests/test_db_params_validator.py
import pytest
from extensions.executor.validator.db_params import DbParamsValidator
class TestDbParamsValidator:
"""Test database parameters validator"""
def setup_method(self):
"""Initialize module instance before each test"""
self.module = DbParamsValidator()
def test_valid_select_statement(self):
"""Validate legal SELECT statement"""
inputs = {
"table": "user_info",
"sql": "SELECT * FROM user_info WHERE id = 1",
"timeout": 30
}
context = create_mock_context()
result = self.module.execute(inputs, context)
assert result["valid"] is True
assert result["message"] == "Validation passed"
assert result["errors"] == []
def test_dangerous_sql_drop(self):
"""Detect DROP statement"""
inputs = {
"table": "user_info",
"sql": "DROP TABLE user_info"
}
context = create_mock_context()
result = self.module.execute(inputs, context)
assert result["valid"] is False
assert any(e["code"] == "DANGEROUS_SQL" for e in result["errors"])
def test_default_timeout(self):
"""Verify default value of timeout field"""
inputs = {
"table": "user_info",
"sql": "SELECT 1"
}
context = create_mock_context()
result = self.module.execute(inputs, context)
assert result["valid"] is True
2.2 Mock Context Creation¶
Context is the runtime context for module execution; Mock objects need to be constructed for testing.
# tests/conftest.py
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class MockIdentity:
"""Mock identity information"""
id: str = "test-user"
type: str = "user"
roles: list[str] = field(default_factory=lambda: ["admin"])
attrs: dict[str, Any] = field(default_factory=dict)
@dataclass
class MockContext:
"""Mock execution context"""
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
caller_id: Optional[str] = None
call_chain: list[str] = field(default_factory=list)
executor: Any = None
identity: Optional[MockIdentity] = None
data: dict[str, Any] = field(default_factory=dict)
def create_mock_context(
caller_id: Optional[str] = None,
trace_id: Optional[str] = None,
call_chain: Optional[list[str]] = None,
identity: Optional[MockIdentity] = None,
executor: Any = None,
data: Optional[dict] = None
) -> MockContext:
"""Factory function for creating Mock Context"""
return MockContext(
trace_id=trace_id or str(uuid.uuid4()),
caller_id=caller_id,
call_chain=call_chain or [],
executor=executor,
identity=identity or MockIdentity(),
data=data or {}
)
2.3 Mock Executor Pattern¶
When modules internally need to call other modules, Mock Executor is required.
# tests/conftest.py
class MockExecutor:
"""Configurable Mock Executor with response values"""
def __init__(self):
self._responses: dict[str, Any] = {}
self._errors: dict[str, Exception] = {}
self._call_log: list[dict] = []
def register_response(self, module_id: str, response: dict):
"""Register return value for module call"""
self._responses[module_id] = response
def register_error(self, module_id: str, error: Exception):
"""Register exception for module call"""
self._errors[module_id] = error
def call(self, module_id: str, inputs: dict, context: Any) -> dict:
"""Simulate module call"""
self._call_log.append({
"module_id": module_id,
"inputs": inputs,
"trace_id": getattr(context, "trace_id", None)
})
if module_id in self._errors:
raise self._errors[module_id]
if module_id in self._responses:
return self._responses[module_id]
raise ModuleNotFoundError(f"Mock: Unregistered module {module_id}")
async def call_async(self, module_id: str, inputs: dict, context: Any) -> dict:
"""Simulate async module call"""
return self.call(module_id, inputs, context)
@property
def call_log(self) -> list[dict]:
"""Get call log"""
return self._call_log
def assert_called(self, module_id: str, times: int = 1):
"""Assert a module was called a specific number of times"""
actual = sum(1 for c in self._call_log if c["module_id"] == module_id)
assert actual == times, f"Expected {module_id} to be called {times} times, actually {actual} times"
def assert_not_called(self, module_id: str):
"""Assert a module was not called"""
self.assert_called(module_id, times=0)
class ModuleNotFoundError(Exception):
pass
Test example using Mock Executor:
class TestTaskFlowOrchestrator:
"""Test task flow orchestrator (internally calls other modules)"""
def setup_method(self):
self.module = TaskFlowOrchestrator()
self.executor = MockExecutor()
def test_orchestrator_calls_validator_then_executor(self):
"""Verify orchestrator calls validator then executor"""
# Register Mock return values
self.executor.register_response(
"executor.validator.db_params",
{"valid": True, "message": "Validation passed", "errors": []}
)
self.executor.register_response(
"executor.handler.db_query",
{"rows": [{"id": 1, "name": "test"}], "count": 1}
)
context = create_mock_context(
caller_id="api.handler.task_submit",
executor=self.executor
)
inputs = {"table": "user_info", "sql": "SELECT * FROM user_info"}
result = self.module.execute(inputs, context)
# Verify call order and count
self.executor.assert_called("executor.validator.db_params", times=1)
self.executor.assert_called("executor.handler.db_query", times=1)
assert result["count"] == 1
def test_orchestrator_stops_on_validation_failure(self):
"""Verify executor is not called when validation fails"""
self.executor.register_response(
"executor.validator.db_params",
{"valid": False, "message": "Validation failed", "errors": [{"code": "DANGEROUS_SQL"}]}
)
context = create_mock_context(executor=self.executor)
inputs = {"table": "user_info", "sql": "DROP TABLE user_info"}
result = self.module.execute(inputs, context)
self.executor.assert_called("executor.validator.db_params", times=1)
self.executor.assert_not_called("executor.handler.db_query")
assert result["valid"] is False
2.4 Input/Output Validation Testing¶
Verify that the module's Schema definition matches actual behavior.
from pydantic import ValidationError
class TestDbParamsInputSchema:
"""Test input Schema validation"""
def test_valid_input(self):
"""Valid input should pass validation"""
data = {"table": "user_info", "sql": "SELECT 1", "timeout": 30}
model = DBParamsInput(**data)
assert model.table == "user_info"
def test_missing_required_field(self):
"""Missing required field should raise error"""
with pytest.raises(ValidationError) as exc_info:
DBParamsInput(sql="SELECT 1") # Missing table
errors = exc_info.value.errors()
assert any(e["loc"] == ("table",) for e in errors)
def test_invalid_table_pattern(self):
"""Table name not matching pattern should raise error"""
with pytest.raises(ValidationError):
DBParamsInput(table="User-Info", sql="SELECT 1")
def test_timeout_boundary_values(self):
"""Boundary values of timeout field"""
# Minimum value
model = DBParamsInput(table="t", sql="SELECT 1", timeout=1)
assert model.timeout == 1
# Maximum value
model = DBParamsInput(table="t", sql="SELECT 1", timeout=300)
assert model.timeout == 300
# Exceeding maximum value
with pytest.raises(ValidationError):
DBParamsInput(table="t", sql="SELECT 1", timeout=301)
# Below minimum value
with pytest.raises(ValidationError):
DBParamsInput(table="t", sql="SELECT 1", timeout=0)
3. Schema Testing¶
3.1 Validating YAML Schema Definitions¶
Ensure the YAML Schema file structure itself is correct.
# tests/test_schemas.py
import yaml
import jsonschema
from pathlib import Path
SCHEMAS_DIR = Path("schemas")
class TestSchemaDefinitions:
"""Test validity of Schema files"""
def get_all_schema_files(self) -> list[Path]:
"""Get all Schema files"""
return list(SCHEMAS_DIR.glob("*.schema.yaml"))
@pytest.mark.parametrize("schema_file", SCHEMAS_DIR.glob("*.schema.yaml"))
def test_schema_is_valid_yaml(self, schema_file: Path):
"""Schema file must be valid YAML"""
with open(schema_file) as f:
data = yaml.safe_load(f)
assert data is not None, f"{schema_file.name} is empty"
@pytest.mark.parametrize("schema_file", SCHEMAS_DIR.glob("*.schema.yaml"))
def test_schema_has_required_fields(self, schema_file: Path):
"""Schema file must contain required fields"""
with open(schema_file) as f:
data = yaml.safe_load(f)
assert "module_id" in data, f"{schema_file.name} missing module_id"
assert "description" in data, f"{schema_file.name} missing description"
assert "input_schema" in data, f"{schema_file.name} missing input_schema"
assert "output_schema" in data, f"{schema_file.name} missing output_schema"
@pytest.mark.parametrize("schema_file", SCHEMAS_DIR.glob("*.schema.yaml"))
def test_input_schema_is_valid_json_schema(self, schema_file: Path):
"""input_schema must be valid JSON Schema"""
with open(schema_file) as f:
data = yaml.safe_load(f)
input_schema = data.get("input_schema", {})
# Verify it is a valid JSON Schema
jsonschema.Draft202012Validator.check_schema(input_schema)
@pytest.mark.parametrize("schema_file", SCHEMAS_DIR.glob("*.schema.yaml"))
def test_output_schema_is_valid_json_schema(self, schema_file: Path):
"""output_schema must be valid JSON Schema"""
with open(schema_file) as f:
data = yaml.safe_load(f)
output_schema = data.get("output_schema", {})
jsonschema.Draft202012Validator.check_schema(output_schema)
@pytest.mark.parametrize("schema_file", SCHEMAS_DIR.glob("*.schema.yaml"))
def test_all_properties_have_description(self, schema_file: Path):
"""All properties should have description"""
with open(schema_file) as f:
data = yaml.safe_load(f)
def check_descriptions(schema: dict, path: str = ""):
properties = schema.get("properties", {})
for name, prop in properties.items():
current_path = f"{path}.{name}" if path else name
assert "description" in prop, (
f"{schema_file.name}: {current_path} missing description"
)
# Recursively check nested objects
if prop.get("type") == "object":
check_descriptions(prop, current_path)
check_descriptions(data.get("input_schema", {}))
check_descriptions(data.get("output_schema", {}))
3.2 Schema Boundary Value Testing¶
class TestSchemaEdgeCases:
"""Test Schema boundary cases"""
def setup_method(self):
"""Load Schema"""
with open("schemas/executor/validator/db_params.schema.yaml") as f:
self.schema = yaml.safe_load(f)
self.input_schema = self.schema["input_schema"]
def test_null_values(self):
"""Test handling of null values"""
validator = jsonschema.Draft202012Validator(self.input_schema)
# Required field as null should fail
result = validator.is_valid({"table": None, "sql": "SELECT 1"})
assert result is False
def test_empty_string(self):
"""Test empty string"""
validator = jsonschema.Draft202012Validator(self.input_schema)
# Empty table name doesn't match pattern
result = validator.is_valid({"table": "", "sql": "SELECT 1"})
assert result is False
def test_missing_optional_field(self):
"""Test missing optional field"""
validator = jsonschema.Draft202012Validator(self.input_schema)
# Only provide required fields
result = validator.is_valid({"table": "user_info", "sql": "SELECT 1"})
assert result is True
def test_extra_fields_rejected(self):
"""Reject extra fields when additionalProperties: false"""
validator = jsonschema.Draft202012Validator(self.input_schema)
result = validator.is_valid({
"table": "user_info",
"sql": "SELECT 1",
"unknown_field": "value"
})
# If Schema has additionalProperties: false
if self.input_schema.get("additionalProperties") is False:
assert result is False
def test_boundary_integer_values(self):
"""Test integer boundaries"""
validator = jsonschema.Draft202012Validator(self.input_schema)
# Minimum value for timeout
result = validator.is_valid({
"table": "t",
"sql": "SELECT 1",
"timeout": 1
})
assert result is True
# Maximum value for timeout
result = validator.is_valid({
"table": "t",
"sql": "SELECT 1",
"timeout": 300
})
assert result is True
# Exceeding maximum value
result = validator.is_valid({
"table": "t",
"sql": "SELECT 1",
"timeout": 301
})
assert result is False
3.3 Schema Compatibility Testing¶
Ensure Schema changes don't break compatibility.
class TestSchemaCompatibility:
"""Test Schema version compatibility"""
def test_new_optional_field_is_backward_compatible(self):
"""Adding optional field should not break old data"""
# Old version input (without new field)
old_input = {"table": "user_info", "sql": "SELECT 1"}
validator = jsonschema.Draft202012Validator(self.input_schema)
assert validator.is_valid(old_input), "Old data should still be valid after adding optional field"
def test_required_fields_not_removed(self):
"""Required fields should not be removed"""
required = self.input_schema.get("required", [])
# Compare with previous version's required fields list
previous_required = ["table", "sql"]
for field_name in previous_required:
assert field_name in required, (
f"Required field {field_name} was removed, this is a breaking change"
)
def test_enum_values_not_removed(self):
"""Enum values should not be removed"""
properties = self.input_schema.get("properties", {})
for name, prop in properties.items():
if "enum" in prop:
# Ensure current enum values are a superset of previous version
pass # Implement according to actual situation
4. Integration Testing¶
4.1 Testing Module Interactions¶
# tests/integration/test_module_interactions.py
class TestModuleInteractions:
"""Test interactions between modules"""
def setup_method(self):
"""Initialize real Registry and Executor"""
from apcore import Registry, Executor
self.registry = Registry(extensions_dir="./extensions")
self.registry.discover()
self.executor = Executor(registry=self.registry)
def test_orchestrator_calls_executor(self):
"""Complete workflow of orchestrator calling executor"""
context = create_mock_context(
caller_id="api.handler.task_submit",
executor=self.executor
)
result = self.executor.call(
module_id="orchestrator.engine.task_flow",
inputs={"table": "user_info", "sql": "SELECT * FROM user_info"},
context=context
)
assert "rows" in result or "valid" in result
def test_call_chain_propagation(self):
"""Verify call chain propagates correctly"""
context = create_mock_context(
caller_id="api.handler.task_submit"
)
# After call, call_chain should contain call path
result = self.executor.call(
module_id="executor.validator.db_params",
inputs={"table": "user_info", "sql": "SELECT 1"},
context=context
)
# Verify trace_id remains consistent throughout the call chain
assert context.trace_id is not None
4.2 Testing Middleware Chain¶
class TestMiddlewareChain:
"""Test middleware execution order"""
def test_middleware_execution_order(self):
"""Verify middleware executes in onion model"""
execution_log = []
class LoggingMiddleware:
def __init__(self, name):
self.name = name
def before(self, module_id, inputs, context):
execution_log.append(f"{self.name}:before")
return inputs
def after(self, module_id, output, context):
execution_log.append(f"{self.name}:after")
return output
def on_error(self, module_id, error, context):
execution_log.append(f"{self.name}:error")
from apcore import Registry, Executor
registry = Registry(extensions_dir="./extensions")
registry.discover()
# Add middleware
registry.add_middleware("first", LoggingMiddleware("first"), priority=100)
registry.add_middleware("second", LoggingMiddleware("second"), priority=50)
executor = Executor(registry=registry)
context = create_mock_context(executor=executor)
executor.call(
module_id="executor.validator.db_params",
inputs={"table": "user_info", "sql": "SELECT 1"},
context=context
)
# Onion model: before in descending priority, after in ascending priority
assert execution_log == [
"first:before",
"second:before",
"second:after",
"first:after"
]
def test_middleware_abort(self):
"""Middleware can abort execution"""
class AbortMiddleware:
def before(self, module_id, inputs, context):
raise PermissionError("Middleware abort: Insufficient permission")
def after(self, module_id, output, context):
return output
def on_error(self, module_id, error, context):
pass
from apcore import Registry, Executor
registry = Registry(extensions_dir="./extensions")
registry.discover()
registry.add_middleware("abort", AbortMiddleware(), priority=100)
executor = Executor(registry=registry)
context = create_mock_context(executor=executor)
with pytest.raises(PermissionError):
executor.call(
module_id="executor.validator.db_params",
inputs={"table": "user_info", "sql": "SELECT 1"},
context=context
)
4.3 Full Pipeline Testing¶
class TestFullPipeline:
"""Test complete Registry -> Executor -> Module workflow"""
def test_full_pipeline_with_acl(self):
"""Full Pipeline: Register -> Discover -> ACL Check -> Schema Validation -> Execute"""
from apcore import Registry, Executor, ACL
# 1. Registry
registry = Registry(extensions_dir="./extensions")
registry.discover()
# 2. ACL
acl = ACL.load("./acl/global_acl.yaml")
# 3. Executor
executor = Executor(registry=registry, acl=acl)
# 4. Execute
context = create_mock_context(
caller_id="orchestrator.engine.task_flow",
executor=executor
)
result = executor.call(
module_id="executor.validator.db_params",
inputs={"table": "user_info", "sql": "SELECT * FROM user_info"},
context=context
)
assert result["valid"] is True
def test_full_pipeline_acl_denied(self):
"""Full Pipeline: ACL denial should raise ACLDeniedError"""
from apcore import Registry, Executor, ACL, ACLDeniedError
registry = Registry(extensions_dir="./extensions")
registry.discover()
# Use strict ACL rules
acl = ACL.load("./acl/strict_acl.yaml")
executor = Executor(registry=registry, acl=acl)
context = create_mock_context(
caller_id="unauthorized.module",
executor=executor
)
with pytest.raises(ACLDeniedError) as exc_info:
executor.call(
module_id="internal.secret.module",
inputs={},
context=context
)
assert exc_info.value.caller_id == "unauthorized.module"
assert exc_info.value.target_id == "internal.secret.module"
def test_full_pipeline_schema_validation_error(self):
"""Full Pipeline: Schema validation failure"""
from apcore import Registry, Executor, SchemaValidationError
registry = Registry(extensions_dir="./extensions")
registry.discover()
executor = Executor(registry=registry)
context = create_mock_context(executor=executor)
with pytest.raises(SchemaValidationError) as exc_info:
executor.call(
module_id="executor.validator.db_params",
inputs={"table": "INVALID-TABLE!", "sql": "SELECT 1"},
context=context
)
error = exc_info.value
assert error.code == "SCHEMA_VALIDATION_ERROR"
assert len(error.errors) > 0
5. ACL Testing¶
5.1 Testing Permission Rules¶
# tests/test_acl.py
import yaml
from apcore import ACL
class TestACLRules:
"""Test ACL permission rules"""
def setup_method(self):
"""Load test ACL configuration"""
self.acl = ACL.load("./acl/global_acl.yaml")
def test_allow_orchestrator_to_executor(self):
"""Orchestration layer can call execution layer"""
result = self.acl.check(
caller_id="orchestrator.engine.task_flow",
target_id="executor.validator.db_params"
)
assert result is True
def test_deny_executor_to_api(self):
"""Execution layer cannot call API layer"""
result = self.acl.check(
caller_id="executor.handler.db_query",
target_id="api.handler.task_submit"
)
assert result is False
def test_external_caller(self):
"""External call (caller_id is None)"""
result = self.acl.check(
caller_id=None,
target_id="api.handler.task_submit"
)
# Depends on ACL configuration for @external
assert isinstance(result, bool)
5.2 Testing Deny/Allow Scenarios¶
class TestACLDenyAllow:
"""Test various deny and allow scenarios"""
def test_default_deny_without_matching_rule(self):
"""With default_effect: deny, no matching rule should deny"""
acl_config = {
"default_effect": "deny",
"rules": [
{
"callers": ["orchestrator.*"],
"targets": ["executor.*"],
"effect": "allow"
}
]
}
acl = ACL.from_dict(acl_config)
# No matching rule -> deny
result = acl.check("api.handler.test", "internal.secret")
assert result is False
def test_default_allow_without_matching_rule(self):
"""With default_effect: allow, no matching rule should allow"""
acl_config = {
"default_effect": "allow",
"rules": [
{
"callers": ["*"],
"targets": ["internal.*"],
"effect": "deny"
}
]
}
acl = ACL.from_dict(acl_config)
# No matching rule -> allow
result = acl.check("api.handler.test", "common.util.format")
assert result is True
def test_deny_overrides_allow_same_priority(self):
"""Deny takes precedence over allow at same priority"""
acl_config = {
"default_effect": "allow",
"rules": [
{
"callers": ["*"],
"targets": ["internal.*"],
"effect": "allow",
"priority": 10
},
{
"callers": ["*"],
"targets": ["internal.*"],
"effect": "deny",
"priority": 10
}
]
}
acl = ACL.from_dict(acl_config)
result = acl.check("api.handler.test", "internal.secret")
assert result is False
def test_higher_priority_wins(self):
"""Higher priority rule takes precedence"""
acl_config = {
"default_effect": "deny",
"rules": [
{
"callers": ["*"],
"targets": ["internal.*"],
"effect": "deny",
"priority": 10
},
{
"callers": ["admin.*"],
"targets": ["internal.*"],
"effect": "allow",
"priority": 100
}
]
}
acl = ACL.from_dict(acl_config)
# Admin can access due to higher priority
result = acl.check("admin.panel", "internal.secret")
assert result is True
# Regular user denied
result = acl.check("api.handler.test", "internal.secret")
assert result is False
5.3 Testing Pattern Matching¶
class TestACLPatternMatching:
"""Test ACL pattern matching"""
def test_exact_match(self):
"""Exact match"""
acl_config = {
"default_effect": "deny",
"rules": [{
"callers": ["orchestrator.user.register"],
"targets": ["executor.email.send_email"],
"effect": "allow"
}]
}
acl = ACL.from_dict(acl_config)
assert acl.check("orchestrator.user.register", "executor.email.send_email") is True
assert acl.check("orchestrator.user.login", "executor.email.send_email") is False
def test_wildcard_prefix(self):
"""Prefix wildcard"""
acl_config = {
"default_effect": "deny",
"rules": [{
"callers": ["orchestrator.*"],
"targets": ["executor.*"],
"effect": "allow"
}]
}
acl = ACL.from_dict(acl_config)
assert acl.check("orchestrator.engine.task_flow", "executor.validator.db_params") is True
assert acl.check("orchestrator.a.b.c", "executor.x.y.z") is True
assert acl.check("api.handler.test", "executor.validator.db_params") is False
def test_global_wildcard(self):
"""Global wildcard"""
acl_config = {
"default_effect": "deny",
"rules": [{
"callers": ["*"],
"targets": ["common.*"],
"effect": "allow"
}]
}
acl = ACL.from_dict(acl_config)
assert acl.check("api.handler.test", "common.util.format") is True
assert acl.check("executor.handler.db", "common.util.format") is True
def test_special_callers(self):
"""Special caller identifiers"""
acl_config = {
"default_effect": "deny",
"rules": [
{
"callers": ["@external"],
"targets": ["api.*"],
"effect": "allow"
},
{
"callers": ["@system"],
"targets": ["internal.*"],
"effect": "allow"
}
]
}
acl = ACL.from_dict(acl_config)
# None caller_id maps to @external
assert acl.check(None, "api.handler.task_submit") is True
assert acl.check(None, "internal.secret") is False
5.4 ACL Batch Testing¶
Use parameterized testing to efficiently cover multiple scenarios.
@pytest.mark.parametrize("caller,target,expected", [
# Normal layered calls
("api.handler.user", "orchestrator.engine.task", True),
("orchestrator.engine.task", "executor.handler.db", True),
("api.handler.user", "common.util.format", True),
# Cross-layer calls (forbidden)
("api.handler.user", "executor.handler.db", False),
("executor.handler.db", "api.handler.user", False),
# External calls
(None, "api.handler.user", True),
(None, "internal.secret", False),
# Self calls
("executor.handler.db", "executor.handler.db", True),
])
def test_acl_rules(caller, target, expected):
"""Parameterized testing of ACL rules"""
acl = ACL.load("./acl/global_acl.yaml")
result = acl.check(caller, target)
assert result == expected, (
f"ACL check({caller} -> {target}): "
f"Expected {expected}, actual {result}"
)
6. Mock Strategies¶
6.1 When to Mock, When to Use Real Implementation¶
| Scenario | Recommendation | Reason |
|---|---|---|
| Unit test module logic | Mock Executor + Mock Context | Isolate module under test |
| Test Schema validation | Use real Schema | Schema is part of source code |
| Test ACL rules | Use real ACL configuration | ACL configuration is the test subject |
| Test middleware chain | Mock Module + real middleware | Isolate middleware behavior |
| Test module interactions | Real Registry + real Executor | Verify integration |
| Test external API calls | Mock external dependencies | Avoid network dependencies |
6.2 Mock Context Factory¶
# tests/factories.py
class ContextFactory:
"""Context factory supporting different test scenarios"""
@staticmethod
def external_call(**overrides) -> MockContext:
"""Simulate external call (no caller_id)"""
defaults = {
"caller_id": None,
"call_chain": [],
"identity": MockIdentity(type="api_key")
}
defaults.update(overrides)
return create_mock_context(**defaults)
@staticmethod
def internal_call(
caller_id: str,
call_chain: list[str] = None,
**overrides
) -> MockContext:
"""Simulate internal module call"""
defaults = {
"caller_id": caller_id,
"call_chain": call_chain or [caller_id],
"identity": MockIdentity(type="service")
}
defaults.update(overrides)
return create_mock_context(**defaults)
@staticmethod
def admin_call(**overrides) -> MockContext:
"""Simulate admin call"""
defaults = {
"caller_id": "admin.panel",
"identity": MockIdentity(
id="admin-001",
type="user",
roles=["admin", "super_admin"]
)
}
defaults.update(overrides)
return create_mock_context(**defaults)
@staticmethod
def agent_call(agent_id: str = "ai-agent-001", **overrides) -> MockContext:
"""Simulate AI Agent call"""
defaults = {
"caller_id": None,
"identity": MockIdentity(
id=agent_id,
type="agent",
roles=["agent"]
)
}
defaults.update(overrides)
return create_mock_context(**defaults)
6.3 Advanced Mock Executor Usage¶
class AdvancedMockExecutor(MockExecutor):
"""Mock Executor supporting conditional responses and delay simulation"""
def __init__(self):
super().__init__()
self._conditional_responses: list[tuple] = []
self._call_count: dict[str, int] = {}
def register_conditional_response(
self,
module_id: str,
condition: callable,
response: dict
):
"""Return different results based on condition"""
self._conditional_responses.append((module_id, condition, response))
def call(self, module_id: str, inputs: dict, context: Any) -> dict:
"""Enhanced module call simulation"""
self._call_count[module_id] = self._call_count.get(module_id, 0) + 1
# Check conditional responses
for mid, condition, response in self._conditional_responses:
if mid == module_id and condition(inputs, context):
self._call_log.append({
"module_id": module_id,
"inputs": inputs,
"matched": "conditional"
})
return response
return super().call(module_id, inputs, context)
def get_call_count(self, module_id: str) -> int:
"""Get number of times module was called"""
return self._call_count.get(module_id, 0)
# Usage example
def test_conditional_executor_response():
executor = AdvancedMockExecutor()
# Return different results based on input condition
executor.register_conditional_response(
"executor.validator.db_params",
condition=lambda inputs, ctx: "DROP" in inputs.get("sql", "").upper(),
response={"valid": False, "errors": [{"code": "DANGEROUS_SQL"}]}
)
executor.register_response(
"executor.validator.db_params",
{"valid": True, "errors": []}
)
context = create_mock_context(executor=executor)
# Safe SQL -> default response
result = executor.call(
"executor.validator.db_params",
{"table": "t", "sql": "SELECT 1"},
context
)
assert result["valid"] is True
# Dangerous SQL -> conditional response
result = executor.call(
"executor.validator.db_params",
{"table": "t", "sql": "DROP TABLE t"},
context
)
assert result["valid"] is False
6.4 Mock Registry¶
class MockRegistry:
"""Mock Registry for testing module isolation"""
def __init__(self):
self._modules: dict[str, Any] = {}
self._schemas: dict[str, dict] = {}
def register(self, module_id: str, module_instance: Any, schema: dict = None):
"""Register module"""
self._modules[module_id] = module_instance
if schema:
self._schemas[module_id] = schema
def get(self, module_id: str) -> Any:
"""Get module"""
if module_id not in self._modules:
raise ModuleNotFoundError(f"Module not found: {module_id}")
return self._modules[module_id]
def has(self, module_id: str) -> bool:
"""Check if module exists"""
return module_id in self._modules
def list_modules(self) -> list[str]:
"""List all modules"""
return list(self._modules.keys())
def get_schema(self, module_id: str) -> dict:
"""Get module Schema"""
return self._schemas.get(module_id, {})
# Usage example
def test_with_mock_registry():
registry = MockRegistry()
# Only register modules needed for testing
module = DbParamsValidator()
registry.register(
"executor.validator.db_params",
module,
schema={"input_schema": {...}, "output_schema": {...}}
)
assert registry.has("executor.validator.db_params")
assert not registry.has("executor.handler.db_query")
7. Testing Best Practices¶
7.1 Test Naming Convention¶
# Naming format: test_{function_being_tested}_{scenario}_{expected_result}
# Unit tests
def test_validate_sql_with_drop_statement_returns_invalid(): ...
def test_validate_sql_with_select_statement_returns_valid(): ...
def test_execute_with_timeout_zero_raises_validation_error(): ...
# Schema tests
def test_input_schema_rejects_null_table(): ...
def test_input_schema_accepts_optional_timeout(): ...
def test_output_schema_requires_valid_field(): ...
# ACL tests
def test_acl_allows_orchestrator_to_executor(): ...
def test_acl_denies_api_to_internal(): ...
def test_acl_external_caller_mapped_to_at_external(): ...
# Integration tests
def test_pipeline_validator_then_executor_succeeds(): ...
def test_pipeline_with_acl_denied_raises_error(): ...
7.2 Fixtures and Test Data Management¶
# tests/conftest.py
import pytest
from pathlib import Path
@pytest.fixture
def mock_context():
"""Provide default Mock Context"""
return create_mock_context()
@pytest.fixture
def mock_executor():
"""Provide configurable Mock Executor"""
return MockExecutor()
@pytest.fixture
def context_factory():
"""Provide Context factory"""
return ContextFactory()
@pytest.fixture
def db_params_module():
"""Provide database parameters validator instance"""
return DbParamsValidator()
@pytest.fixture
def acl_config():
"""Provide standard test ACL configuration"""
return {
"default_effect": "deny",
"rules": [
{"callers": ["orchestrator.*"], "targets": ["executor.*"], "effect": "allow"},
{"callers": ["api.*"], "targets": ["orchestrator.*"], "effect": "allow"},
{"callers": ["*"], "targets": ["common.*"], "effect": "allow"},
{"callers": ["@external"], "targets": ["api.*"], "effect": "allow"},
]
}
# tests/fixtures/schemas/
# Store test Schema files
# tests/fixtures/acl/
# Store test ACL configuration files
Recommended test directory structure:
tests/
├── conftest.py # Global fixtures
├── factories.py # Mock factories
├── fixtures/ # Test data
│ ├── schemas/ # Test Schemas
│ ├── acl/ # Test ACL configurations
│ └── inputs/ # Test input data
├── unit/ # Unit tests
│ ├── test_db_params.py
│ └── test_email_sender.py
├── schema/ # Schema tests
│ ├── test_schema_validity.py
│ └── test_schema_edge_cases.py
├── acl/ # ACL tests
│ ├── test_acl_rules.py
│ └── test_acl_patterns.py
└── integration/ # Integration tests
├── test_module_interactions.py
└── test_middleware_chain.py
7.3 CI/CD Integration¶
# .github/workflows/test.yml
name: apcore Tests
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install -r requirements-dev.txt
- run: pytest tests/unit/ tests/schema/ tests/acl/ -v --tb=short
name: "Unit tests + Schema tests + ACL tests"
integration-tests:
runs-on: ubuntu-latest
needs: unit-tests
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install -r requirements-dev.txt
- run: pytest tests/integration/ -v --tb=short
name: "Integration tests"
schema-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install pyyaml jsonschema
- run: python scripts/validate_schemas.py
name: "Schema file validation"
Schema batch validation script:
# scripts/validate_schemas.py
import sys
import yaml
import jsonschema
from pathlib import Path
def validate_all_schemas():
"""Validate all Schema files"""
schemas_dir = Path("schemas")
errors = []
for schema_file in schemas_dir.glob("*.schema.yaml"):
try:
with open(schema_file) as f:
data = yaml.safe_load(f)
# Check required fields
for field in ["module_id", "description", "input_schema", "output_schema"]:
if field not in data:
errors.append(f"{schema_file.name}: missing {field}")
# Validate JSON Schema legitimacy
for schema_key in ["input_schema", "output_schema"]:
if schema_key in data:
jsonschema.Draft202012Validator.check_schema(data[schema_key])
print(f"✓ {schema_file.name} ... OK")
except Exception as e:
errors.append(f"{schema_file.name}: {e}")
print(f"✗ {schema_file.name} ... FAIL")
if errors:
print(f"\n{len(errors)} errors:")
for err in errors:
print(f" - {err}")
sys.exit(1)
else:
print(f"\nAll Schema files validated successfully")
if __name__ == "__main__":
validate_all_schemas()
Next Steps¶
- Schema Definition Details - Deep dive into Schema
- ACL Configuration Guide - ACL configuration details
- Executor API - Executor interface reference
- Multi-Language Development Guide - Cross-language testing strategies