Skip to content

Implementation Plan

Phase 1: Foundation & Core Pipeline

1. Project scaffold and configuration

1.1 Initialize Python project with pyproject.toml, FastAPI app entry point, and directory structure.

  • Create pyproject.toml with all dependencies (fastapi, sqlalchemy, asyncpg, arq, pydantic, httpx, pynacl, weasyprint, boto3, slowapi, structlog, stripe, pyjwt)
  • Create app/main.py with FastAPI lifespan, app/core/config.py with pydantic-settings Settings class
  • Create directory tree: app/{api/v1, core, models, schemas, services, connectors, jobs, templates}, tests/{test_api, test_services, test_connectors, test_jobs}, alembic/versions/
  • Create alembic.ini and alembic/env.py configured for async SQLAlchemy
  • Write a basic test that imports the FastAPI app and hits a placeholder /health endpoint

Traces to: FR 8.1, 10.4, 10.5, 10.8

1.2 Docker and local development environment.

  • Create Dockerfile with Python 3.12-slim base and WeasyPrint system deps (pango, cairo, gdk-pixbuf)
  • Create docker-compose.yml with three services: api (uvicorn), postgres (15+), redis (7+)
  • Add .env.example with all required environment variables
  • Verify docker compose up starts all services and the health endpoint responds

Traces to: FR 10.4, 10.5

1.3 Structured logging with correlation IDs.

  • Create app/core/logging.py with structlog configuration: JSON output, correlation ID processor, sensitive field redactor
  • Create FastAPI middleware that generates a correlation ID per request and binds it to structlog context
  • Write tests verifying: correlation ID present in log output, API key patterns are redacted

Traces to: FR 7.9, 8.8


2. Data model and migrations

2.1 ORM models and enums.

  • Create app/models/enums.py: Provider, ConnectionStatus, WorkloadType, PlanTier, BillingStatus
  • Create app/models/base.py: DeclarativeBase, TimestampMixin with timezone-aware UTC timestamps
  • Create app/models/db.py: all 11 tables with relationships, indexes, and unique constraints
  • Use PGUUID(as_uuid=True) for all UUID primary keys

Traces to: FR 9.1, 9.4, 9.5, 9.6, 9.7, 9.8, 9.9

2.2 Initial migration.

  • Generate migration from ORM models: all tables, indexes, constraints, sequences (receipt_serial_seq)
  • Run migration against local Postgres and verify schema

Traces to: FR 9.2

2.3 Immutability triggers.

  • Write migration that creates a PostgreSQL trigger function preventing UPDATE of idempotency_hash, event_timestamp, and model columns on telemetry_events (but allowing token count updates for reconciliation)
  • Write migration for BEFORE DELETE trigger on telemetry_events
  • Write test: attempt to UPDATE idempotency_hash raises, UPDATE uncached_input_tokens succeeds

Traces to: FR 9.3, 2.3

2.4 Database session management.

  • Create app/core/database.py: async engine factory, async_sessionmaker, get_session() generator dependency, init_db() and close_db() lifecycle hooks
  • Configure pool_size=20, max_overflow=10

Traces to: FR 9.1


3. Authentication and authorization

3.1 JWT verification.

  • Create app/core/auth.py: fetch JWKS from Clerk endpoint, verify JWT signature and expiry via PyJWT, extract org_id claim
  • Cache JWKS keys in memory with 1h TTL
  • Write tests: valid token → claims extracted, expired token → 401, bad signature → 401

Traces to: FR 8.2

3.2 Organization dependency.

  • Create app/core/deps.py: get_current_org() FastAPI dependency that verifies JWT and resolves Organization from DB
  • Write tests: valid org → Organization returned, unknown org_id → 404, missing token → 401

Traces to: FR 8.3

3.3 Rate limiting, CORS, HSTS.

  • Create app/core/security.py: slowapi rate limiter (100/s free, 500/s paid, keyed by org_id), CORS middleware (dashboard domain only), HSTS header middleware
  • Write tests: exceed rate limit → 429, CORS preflight with wrong origin → blocked

Traces to: FR 8.5, 8.7

3.4 Clerk webhook.

  • Create app/api/v1/webhooks.py: POST /v1/auth/webhook endpoint, verify Clerk webhook signature, handle organization.created event (create Organization record with Free tier default)
  • Write test: valid webhook → org created in DB

Traces to: FR 5.2


4. Error handling framework

4.1 Domain exceptions and error responses.

  • Create app/schemas/errors.py: ErrorDetail and ErrorResponse Pydantic models
  • Create app/core/exceptions.py: base DomainError, plus ConnectionValidationError, ConnectionNotFoundError, RateLimitExceededError, BillingPeriodNotOpenError, ReceiptNotFoundError, InvalidSignatureError
  • Register FastAPI exception handlers that map domain exceptions to HTTP responses per Error Mapping

Traces to: FR 8.6

4.2 Error classifier.

  • Create app/core/error_classifier.py: ErrorClassifier.classify(status_code) returning "transient", "permanent", or "unknown"
  • Write tests for each status code category (408/429/5xx → transient, 400/401/403/404 → permanent)

Traces to: FR 7.5


5. Provider connectors

5.1 Connector protocol and registry.

  • Create app/connectors/base.py: ProviderConnector Protocol, PollResult, TelemetryEventCandidate, ValidationResult dataclasses
  • Create app/connectors/registry.py: CONNECTORS dict mapping Provider enum to connector classes, get_connector() factory

Traces to: FR 1.1, 1.9

5.2 OpenAI connector.

  • Create app/connectors/openai.py: validate (GET /v1/organization/usage/completions with 1h lookback), poll (iterate completions + embeddings endpoints, bucket_width=1h, map all input as uncached), idempotency_hash (openai:model:bucket_start)
  • Write tests with mocked httpx responses: validation success/failure, poll with multiple buckets, cursor advancement

Traces to: FR 1.1, 2.2, 2.5, 2.3

5.3 Anthropic connector.

  • Create app/connectors/anthropic.py: three-way token split mapping (input_tokens → uncached, cache_creation_input_tokens → cache_creation, cache_read_input_tokens → cached_input)
  • Write tests: verify three-way split mapping, hash includes cache token counts

Traces to: FR 1.1, 2.4, 2.3

5.4 OpenRouter connector.

  • Create app/connectors/openrouter.py: map prompt_tokens → uncached, completion_tokens → output, cache fields = 0
  • Write tests: verify conservative uncached default

Traces to: FR 1.1, 2.5, 2.3


6. Connection management API

6.1 Secrets Manager integration.

  • Create app/services/secrets.py: store_api_key(org_id, provider, key) → arn, get_api_key(arn) → str, delete_api_key(arn) using aioboto3

Traces to: FR 1.2

6.2 Connection service.

  • Create app/services/connections.py: ConnectionService with create (validate key via connector, store in Secrets Manager, find-or-create Default project, create Workload), list, get, delete (revoke in Secrets Manager), manual_sync (rate-limit check)

Traces to: FR 1.1-1.10, 1B.1-1B.7

6.3 Connection API endpoints.

  • Create app/api/v1/connections.py: POST, GET (list), GET (by id), DELETE, POST sync, PUT project
  • Create app/schemas/connections.py: ConnectionCreate (with SecretStr for api_key), ConnectionResponse

Traces to: FR 1.1-1.6, 8.9


7. Telemetry ingestion and emissions

7.1 Telemetry service.

  • Create app/services/telemetry.py: TelemetryService.ingest_batch() with idempotency hash generation and PostgreSQL INSERT ... ON CONFLICT DO UPDATE SET

Traces to: FR 2.1-2.3, 2.6, 2.8

7.2 Emissions engine and carbon factors.

  • Create app/services/model_tier.py: resolve_model_tier() using fnmatch with first-match-wins
  • Create app/services/emissions.py: calculate_emissions() pure function implementing the full pipeline
  • Seed CarbonFactors v1.0 in a migration or fixture

Traces to: FR 3.1-3.9

7.3 Telemetry API endpoints.

  • Create app/api/v1/telemetry.py: GET /telemetry/summary, GET /telemetry/events (paginated), GET /telemetry/models
  • Create app/api/v1/carbon_factors.py: GET /carbon-factors/current, GET /carbon-factors/{version}, GET /carbon-factors
  • Create app/api/v1/methodology.py: GET /methodology (public)

Traces to: FR 6.3, 6.4, 8.4, 3.10, 8.10


8. Background job system

8.1 ARQ worker configuration.

  • Create app/workers.py: WorkerSettings with function registry, cron schedules, redis_settings, max_jobs=10, job_timeout=300

Traces to: FR 7.1, 7.2, 7.6

8.2 Polling job.

  • Create app/jobs/polling.py: poll_provider() with error classification, consecutive_failures tracking (5 → skip), poll_all_connections() cron entry point

Traces to: FR 1.7, 1.8, 2.1, 2.2, 2.9, 2.10, 7.3-7.5

8.3 Reconciliation job.

  • Create app/jobs/reconciliation.py: trailing_reconciliation() that re-polls T-24h through T-0, upserts revised data, enqueues emissions recalculation

Traces to: FR 2.7, 7.2

8.4 Emissions batch job.

  • Create app/jobs/emissions.py: compute_emissions_batch() that loads event, loads current carbon factors, calls calculate_emissions(), upserts CarbonCalculation

Traces to: FR 2.10, 3.1-3.6


9. Receipt and signing engine

9.1 Cryptographic signing.

  • Create app/services/signing.py: ReceiptSigner with sign_receipt (canonical JSON → SHA-256 → Ed25519 sign) and static verify method

Traces to: FR 4.3, 4.7, 4.8

9.2 Receipt service.

  • Create app/services/receipts.py: generate_for_period (allocate serial CL-YYYYMM-XXXXX, build payload, sign, create DB record), get_by_id, verify_signature

Traces to: FR 4.1-4.4, 4.6, 4.10

9.3 PDF generation.

  • Create app/templates/receipt.html: Jinja2 + CSS template for branded receipt PDF
  • Create app/services/pdf.py: generate_receipt_pdf() using WeasyPrint

Traces to: FR 4.5

9.4 Receipt API endpoints.

  • Create app/api/v1/receipts.py: GET /receipts (list), GET /receipts/{id}/pdf, GET /public/receipts/verify/{serial_number} (no auth)

Traces to: FR 4.6, 4.8, 8.2


10. Billing system

10.1 Stripe webhook handling.

  • Extend app/api/v1/webhooks.py: POST /api/v1/billing/webhook with Stripe signature verification, handle invoice.payment_succeeded, invoice.payment_failed, subscription lifecycle events

Traces to: FR 5.5, 5.6, 5.10

10.2 Billing service.

  • Create app/services/billing.py: handle_subscription_created, handle_payment_succeeded (→ closing), handle_payment_failed (→ failed), period aggregation, prior period adjustment logic

Traces to: FR 5.4, 5.5, 5.6, 5.7

10.3 Billing close job.

  • Create app/jobs/billing.py: close_billing_period() with _defer_by=timedelta(hours=48), aggregates CO2, allocates credit serials, calls ReceiptService, transitions period to "closed"

Traces to: FR 4.1, 4.2, 4.10, 5.5

10.4 Billing API endpoints.

  • Create app/api/v1/billing.py: GET /billing/status, POST /billing/upgrade, POST /billing/portal

Traces to: FR 5.1, 5.3, 5.8, 5.9


11. Project and workload management

11.1 Project API.

  • Create app/api/v1/projects.py: POST, GET (list), GET (detail with telemetry summary), PATCH, DELETE

Traces to: FR 1B.4, 1B.6

11.2 Workload management.

  • Create app/api/v1/workloads.py: GET workloads under project, connection re-mapping (PATCH connection with new project_id → creates new workload, deactivates old)

Traces to: FR 1B.3, 1B.4, 1B.5


12. Organization and export endpoints

12.1 Organization endpoints.

  • Create app/api/v1/organizations.py: GET /organizations/me, PATCH /organizations/me

Traces to: FR 8.3

12.2 Data export.

  • Add CSV and JSON export: GET /api/v1/export/telemetry with format, date range, project filters
  • Stream large exports using StreamingResponse

Traces to: FR 6.5


13. Audit pack generation

  • Create app/services/audit_packs.py: generate_for_period (gather receipts + calculations + methodology, build manifest, zip, upload to S3)
  • Create app/jobs/audit_packs.py: monthly cron
  • Create app/api/v1/audit_packs.py: GET /api/v1/export/audit-pack/{year}/{month} (download)

Traces to: FR 4.9


14. Health endpoint

  • Create app/api/v1/health.py: GET /health (no auth) checking DB, Redis, last_poll_at, queue depth
  • Return structured JSON with per-check status

Traces to: FR 7.8


Phase 2: Polish & Hardening

15. Integration test suite

15.1 End-to-end pipeline tests.

  • Test: create org → create connection → poll → telemetry ingested → emissions calculated → billing period closed → receipt generated → receipt verifies → audit pack generated
  • Test: reconciliation updates existing events correctly
  • Test: prior period adjustment created for late events after period close
  • Create Factory Boy factories and shared conftest.py

15.2 Security tests.

  • Test: org A cannot access org B's resources
  • Test: API keys never appear in API responses, error messages, or logs
  • Test: rate limiting enforced per tier
  • Test: Stripe webhook with invalid signature returns 400
  • Test: receipt verification endpoint works without auth

Traces to: FR 8.2, 8.3, 8.5, 8.7, 8.8, 8.9


16. Carbon Factors seed data

  • Write Alembic migration inserting the full v1.0 Carbon Factors (model tiers with glob patterns, PUE mapping, grid intensity, uncertainty bounds)
  • Validate JSON against the Carbon Factors schema
  • Write test: v1.0 loads correctly, resolve_model_tier produces expected results

Traces to: FR 3.4, 3.8


17. V1 router aggregation and OpenAPI

  • Create app/api/v1/__init__.py: aggregate all routers under /v1 prefix
  • Verify auto-generated OpenAPI 3.1 spec includes all endpoints, schemas, and auth requirements
  • Write test: GET /openapi.json returns valid spec with all endpoints present

Traces to: FR 8.1