Implementation Plan¶
Phase 1: Foundation & Core Pipeline¶
1. Project scaffold and configuration¶
1.1 Initialize Python project with pyproject.toml, FastAPI app entry point, and directory structure.
- Create
pyproject.tomlwith all dependencies (fastapi, sqlalchemy, asyncpg, arq, pydantic, httpx, pynacl, weasyprint, boto3, slowapi, structlog, stripe, pyjwt) - Create
app/main.pywith FastAPI lifespan,app/core/config.pywith pydantic-settingsSettingsclass - Create directory tree:
app/{api/v1, core, models, schemas, services, connectors, jobs, templates},tests/{test_api, test_services, test_connectors, test_jobs},alembic/versions/ - Create
alembic.iniandalembic/env.pyconfigured for async SQLAlchemy - Write a basic test that imports the FastAPI app and hits a placeholder
/healthendpoint
Traces to: FR 8.1, 10.4, 10.5, 10.8
1.2 Docker and local development environment.
- Create
Dockerfilewith Python 3.12-slim base and WeasyPrint system deps (pango, cairo, gdk-pixbuf) - Create
docker-compose.ymlwith three services: api (uvicorn), postgres (15+), redis (7+) - Add
.env.examplewith all required environment variables - Verify
docker compose upstarts all services and the health endpoint responds
Traces to: FR 10.4, 10.5
1.3 Structured logging with correlation IDs.
- Create
app/core/logging.pywith structlog configuration: JSON output, correlation ID processor, sensitive field redactor - Create FastAPI middleware that generates a correlation ID per request and binds it to structlog context
- Write tests verifying: correlation ID present in log output, API key patterns are redacted
Traces to: FR 7.9, 8.8
2. Data model and migrations¶
2.1 ORM models and enums.
- Create
app/models/enums.py: Provider, ConnectionStatus, WorkloadType, PlanTier, BillingStatus - Create
app/models/base.py: DeclarativeBase, TimestampMixin with timezone-aware UTC timestamps - Create
app/models/db.py: all 11 tables with relationships, indexes, and unique constraints - Use
PGUUID(as_uuid=True)for all UUID primary keys
Traces to: FR 9.1, 9.4, 9.5, 9.6, 9.7, 9.8, 9.9
2.2 Initial migration.
- Generate migration from ORM models: all tables, indexes, constraints, sequences (
receipt_serial_seq) - Run migration against local Postgres and verify schema
Traces to: FR 9.2
2.3 Immutability triggers.
- Write migration that creates a PostgreSQL trigger function preventing UPDATE of
idempotency_hash,event_timestamp, andmodelcolumns ontelemetry_events(but allowing token count updates for reconciliation) - Write migration for BEFORE DELETE trigger on
telemetry_events - Write test: attempt to UPDATE idempotency_hash raises, UPDATE uncached_input_tokens succeeds
Traces to: FR 9.3, 2.3
2.4 Database session management.
- Create
app/core/database.py: async engine factory, async_sessionmaker,get_session()generator dependency,init_db()andclose_db()lifecycle hooks - Configure pool_size=20, max_overflow=10
Traces to: FR 9.1
3. Authentication and authorization¶
3.1 JWT verification.
- Create
app/core/auth.py: fetch JWKS from Clerk endpoint, verify JWT signature and expiry via PyJWT, extract org_id claim - Cache JWKS keys in memory with 1h TTL
- Write tests: valid token → claims extracted, expired token → 401, bad signature → 401
Traces to: FR 8.2
3.2 Organization dependency.
- Create
app/core/deps.py:get_current_org()FastAPI dependency that verifies JWT and resolves Organization from DB - Write tests: valid org → Organization returned, unknown org_id → 404, missing token → 401
Traces to: FR 8.3
3.3 Rate limiting, CORS, HSTS.
- Create
app/core/security.py: slowapi rate limiter (100/s free, 500/s paid, keyed by org_id), CORS middleware (dashboard domain only), HSTS header middleware - Write tests: exceed rate limit → 429, CORS preflight with wrong origin → blocked
Traces to: FR 8.5, 8.7
3.4 Clerk webhook.
- Create
app/api/v1/webhooks.py: POST /v1/auth/webhook endpoint, verify Clerk webhook signature, handleorganization.createdevent (create Organization record with Free tier default) - Write test: valid webhook → org created in DB
Traces to: FR 5.2
4. Error handling framework¶
4.1 Domain exceptions and error responses.
- Create
app/schemas/errors.py: ErrorDetail and ErrorResponse Pydantic models - Create
app/core/exceptions.py: baseDomainError, plusConnectionValidationError,ConnectionNotFoundError,RateLimitExceededError,BillingPeriodNotOpenError,ReceiptNotFoundError,InvalidSignatureError - Register FastAPI exception handlers that map domain exceptions to HTTP responses per Error Mapping
Traces to: FR 8.6
4.2 Error classifier.
- Create
app/core/error_classifier.py:ErrorClassifier.classify(status_code)returning "transient", "permanent", or "unknown" - Write tests for each status code category (408/429/5xx → transient, 400/401/403/404 → permanent)
Traces to: FR 7.5
5. Provider connectors¶
5.1 Connector protocol and registry.
- Create
app/connectors/base.py:ProviderConnectorProtocol,PollResult,TelemetryEventCandidate,ValidationResultdataclasses - Create
app/connectors/registry.py: CONNECTORS dict mapping Provider enum to connector classes,get_connector()factory
Traces to: FR 1.1, 1.9
5.2 OpenAI connector.
- Create
app/connectors/openai.py: validate (GET /v1/organization/usage/completions with 1h lookback), poll (iterate completions + embeddings endpoints, bucket_width=1h, map all input as uncached), idempotency_hash (openai:model:bucket_start) - Write tests with mocked httpx responses: validation success/failure, poll with multiple buckets, cursor advancement
Traces to: FR 1.1, 2.2, 2.5, 2.3
5.3 Anthropic connector.
- Create
app/connectors/anthropic.py: three-way token split mapping (input_tokens→ uncached,cache_creation_input_tokens→ cache_creation,cache_read_input_tokens→ cached_input) - Write tests: verify three-way split mapping, hash includes cache token counts
Traces to: FR 1.1, 2.4, 2.3
5.4 OpenRouter connector.
- Create
app/connectors/openrouter.py: map prompt_tokens → uncached, completion_tokens → output, cache fields = 0 - Write tests: verify conservative uncached default
Traces to: FR 1.1, 2.5, 2.3
6. Connection management API¶
6.1 Secrets Manager integration.
- Create
app/services/secrets.py:store_api_key(org_id, provider, key) → arn,get_api_key(arn) → str,delete_api_key(arn)using aioboto3
Traces to: FR 1.2
6.2 Connection service.
- Create
app/services/connections.py: ConnectionService with create (validate key via connector, store in Secrets Manager, find-or-create Default project, create Workload), list, get, delete (revoke in Secrets Manager), manual_sync (rate-limit check)
Traces to: FR 1.1-1.10, 1B.1-1B.7
6.3 Connection API endpoints.
- Create
app/api/v1/connections.py: POST, GET (list), GET (by id), DELETE, POST sync, PUT project - Create
app/schemas/connections.py: ConnectionCreate (with SecretStr for api_key), ConnectionResponse
Traces to: FR 1.1-1.6, 8.9
7. Telemetry ingestion and emissions¶
7.1 Telemetry service.
- Create
app/services/telemetry.py:TelemetryService.ingest_batch()with idempotency hash generation and PostgreSQLINSERT ... ON CONFLICT DO UPDATE SET
Traces to: FR 2.1-2.3, 2.6, 2.8
7.2 Emissions engine and carbon factors.
- Create
app/services/model_tier.py:resolve_model_tier()using fnmatch with first-match-wins - Create
app/services/emissions.py:calculate_emissions()pure function implementing the full pipeline - Seed CarbonFactors v1.0 in a migration or fixture
Traces to: FR 3.1-3.9
7.3 Telemetry API endpoints.
- Create
app/api/v1/telemetry.py: GET /telemetry/summary, GET /telemetry/events (paginated), GET /telemetry/models - Create
app/api/v1/carbon_factors.py: GET /carbon-factors/current, GET /carbon-factors/{version}, GET /carbon-factors - Create
app/api/v1/methodology.py: GET /methodology (public)
Traces to: FR 6.3, 6.4, 8.4, 3.10, 8.10
8. Background job system¶
8.1 ARQ worker configuration.
- Create
app/workers.py: WorkerSettings with function registry, cron schedules, redis_settings, max_jobs=10, job_timeout=300
Traces to: FR 7.1, 7.2, 7.6
8.2 Polling job.
- Create
app/jobs/polling.py:poll_provider()with error classification, consecutive_failures tracking (5 → skip),poll_all_connections()cron entry point
Traces to: FR 1.7, 1.8, 2.1, 2.2, 2.9, 2.10, 7.3-7.5
8.3 Reconciliation job.
- Create
app/jobs/reconciliation.py:trailing_reconciliation()that re-polls T-24h through T-0, upserts revised data, enqueues emissions recalculation
Traces to: FR 2.7, 7.2
8.4 Emissions batch job.
- Create
app/jobs/emissions.py:compute_emissions_batch()that loads event, loads current carbon factors, callscalculate_emissions(), upserts CarbonCalculation
Traces to: FR 2.10, 3.1-3.6
9. Receipt and signing engine¶
9.1 Cryptographic signing.
- Create
app/services/signing.py: ReceiptSigner with sign_receipt (canonical JSON → SHA-256 → Ed25519 sign) and static verify method
Traces to: FR 4.3, 4.7, 4.8
9.2 Receipt service.
- Create
app/services/receipts.py: generate_for_period (allocate serialCL-YYYYMM-XXXXX, build payload, sign, create DB record), get_by_id, verify_signature
Traces to: FR 4.1-4.4, 4.6, 4.10
9.3 PDF generation.
- Create
app/templates/receipt.html: Jinja2 + CSS template for branded receipt PDF - Create
app/services/pdf.py:generate_receipt_pdf()using WeasyPrint
Traces to: FR 4.5
9.4 Receipt API endpoints.
- Create
app/api/v1/receipts.py: GET /receipts (list), GET /receipts/{id}/pdf, GET /public/receipts/verify/{serial_number} (no auth)
Traces to: FR 4.6, 4.8, 8.2
10. Billing system¶
10.1 Stripe webhook handling.
- Extend
app/api/v1/webhooks.py: POST /api/v1/billing/webhook with Stripe signature verification, handleinvoice.payment_succeeded,invoice.payment_failed, subscription lifecycle events
Traces to: FR 5.5, 5.6, 5.10
10.2 Billing service.
- Create
app/services/billing.py: handle_subscription_created, handle_payment_succeeded (→ closing), handle_payment_failed (→ failed), period aggregation, prior period adjustment logic
Traces to: FR 5.4, 5.5, 5.6, 5.7
10.3 Billing close job.
- Create
app/jobs/billing.py:close_billing_period()with_defer_by=timedelta(hours=48), aggregates CO2, allocates credit serials, calls ReceiptService, transitions period to "closed"
Traces to: FR 4.1, 4.2, 4.10, 5.5
10.4 Billing API endpoints.
- Create
app/api/v1/billing.py: GET /billing/status, POST /billing/upgrade, POST /billing/portal
Traces to: FR 5.1, 5.3, 5.8, 5.9
11. Project and workload management¶
11.1 Project API.
- Create
app/api/v1/projects.py: POST, GET (list), GET (detail with telemetry summary), PATCH, DELETE
Traces to: FR 1B.4, 1B.6
11.2 Workload management.
- Create
app/api/v1/workloads.py: GET workloads under project, connection re-mapping (PATCH connection with new project_id → creates new workload, deactivates old)
Traces to: FR 1B.3, 1B.4, 1B.5
12. Organization and export endpoints¶
12.1 Organization endpoints.
- Create
app/api/v1/organizations.py: GET /organizations/me, PATCH /organizations/me
Traces to: FR 8.3
12.2 Data export.
- Add CSV and JSON export: GET /api/v1/export/telemetry with format, date range, project filters
- Stream large exports using StreamingResponse
Traces to: FR 6.5
13. Audit pack generation¶
- Create
app/services/audit_packs.py: generate_for_period (gather receipts + calculations + methodology, build manifest, zip, upload to S3) - Create
app/jobs/audit_packs.py: monthly cron - Create
app/api/v1/audit_packs.py: GET /api/v1/export/audit-pack/{year}/{month} (download)
Traces to: FR 4.9
14. Health endpoint¶
- Create
app/api/v1/health.py: GET /health (no auth) checking DB, Redis, last_poll_at, queue depth - Return structured JSON with per-check status
Traces to: FR 7.8
Phase 2: Polish & Hardening¶
15. Integration test suite¶
15.1 End-to-end pipeline tests.
- Test: create org → create connection → poll → telemetry ingested → emissions calculated → billing period closed → receipt generated → receipt verifies → audit pack generated
- Test: reconciliation updates existing events correctly
- Test: prior period adjustment created for late events after period close
- Create Factory Boy factories and shared conftest.py
15.2 Security tests.
- Test: org A cannot access org B's resources
- Test: API keys never appear in API responses, error messages, or logs
- Test: rate limiting enforced per tier
- Test: Stripe webhook with invalid signature returns 400
- Test: receipt verification endpoint works without auth
Traces to: FR 8.2, 8.3, 8.5, 8.7, 8.8, 8.9
16. Carbon Factors seed data¶
- Write Alembic migration inserting the full v1.0 Carbon Factors (model tiers with glob patterns, PUE mapping, grid intensity, uncertainty bounds)
- Validate JSON against the Carbon Factors schema
- Write test: v1.0 loads correctly, resolve_model_tier produces expected results
Traces to: FR 3.4, 3.8
17. V1 router aggregation and OpenAPI¶
- Create
app/api/v1/__init__.py: aggregate all routers under /v1 prefix - Verify auto-generated OpenAPI 3.1 spec includes all endpoints, schemas, and auth requirements
- Write test: GET /openapi.json returns valid spec with all endpoints present
Traces to: FR 8.1