openapi: 3.0.3
info:
  title: BatchPipe Management API (v1)
  description: |
    Workspace-automation JSON API under `/v1/*`, authenticated with the same **active workspace API keys**
    as ingestion (`Authorization: Bearer …`). Shapes match `lib/utils/pipe_validators.js` (Zod) and
    `routes/management_api_routes.js`.

    **SSL to customer Postgres/MySQL:** delivery uses TLS when `DESTINATION_DB_SSL=true` **or** when
    `destination_config.host` is not `localhost` / `127.0.0.1` (`lib/delivery/database_destination.js` —
    there is no per-destination `ssl: true` flag in config today).

    **Validation errors (400):** `{ "error": "validation_error", "message": "…", "errors": { } }` where
    each key is a dot-joined Zod path (e.g. `columns.0.column_name`, `destination_config.url`) and the
    value is a human-readable string (see `/docs/api/management-contract`).

    This bundle is **beta**: field-level docs track the running app; report drift via your BatchPipe channel.
  version: "1.0.0"

servers:
  - url: "{origin}"
    description: Your BatchPipe deployment (same value as `BASE_URL` / dashboard origin).
    variables:
      origin:
        default: "https://your-batchpipe-host.example"

tags:
  - name: Pipes
  - name: Destinations
  - name: Limits
  - name: AllowedOrigins
  - name: ApiKeys

security:
  - bearerAuth: []

components:
  securitySchemes:
    bearerAuth:
      type: http
      scheme: bearer
      bearerFormat: BatchPipe workspace API key (`bp_v1_…`)

  parameters:
    PipeId:
      name: pipe_id
      in: path
      required: true
      schema:
        type: string
        format: uuid
    DestinationId:
      name: destination_id
      in: path
      required: true
      schema:
        type: string
        format: uuid

  schemas:
    ValidationError:
      type: object
      required: [error, message, errors]
      properties:
        error:
          type: string
          enum: [validation_error]
        message:
          type: string
          example: Request body failed validation
        errors:
          type: object
          additionalProperties:
            type: string
          example:
            pipe_name: Pipe name is required
            columns.0.source_field: Invalid input

    CreatePipeRequest:
      type: object
      required: [pipe_name]
      properties:
        pipe_name:
          type: string
          minLength: 1
          maxLength: 100
          pattern: '^[a-zA-Z0-9_-]+$'
          description: Unique per workspace (409 `duplicate_pipe_name` on conflict).
        pipe_description:
          type: string
          maxLength: 500
        pipe_add_ingest_ts:
          type: boolean
          description: Default `true` if omitted (server inserts enrichment timestamp when enabled).
        pipe_add_ingest_ip:
          type: boolean
          description: Default `false` if omitted.
        pipe_ingest_ts_field:
          type: string
          maxLength: 100
          description: Target JSON field name for timestamp; default `ingest_ts`.
        pipe_ingest_ip_field:
          type: string
          maxLength: 100
          description: Target JSON field name for client IP; default `ingest_ip`.
        pipe_api_key_required:
          type: boolean
          description: |
            Default `true`. If `false` (CORS-only), browser `Origin` must match an allowed origin for ingest,
            unless `pipe_allow_any_browser_origin` is `true` (then any standard `http(s)` browser origin is accepted).
        pipe_allow_any_browser_origin:
          type: boolean
          description: |
            Default `false`. When `true`, ingest and preflight echo the request `Origin` in
            `Access-Control-Allow-Origin` for any standard `http(s)` browser origin, without allowlist checks:
            with `pipe_api_key_required`, a valid Bearer API key is still required; with CORS-only pipes, no API key
            (high abuse risk if the ingest URL leaks).

    UpdatePipeRequest:
      type: object
      properties:
        pipe_description: { type: string, maxLength: 500 }
        pipe_status:
          type: string
          enum: [active, paused]
        pipe_add_ingest_ts: { type: boolean }
        pipe_add_ingest_ip: { type: boolean }
        pipe_ingest_ts_field: { type: string, maxLength: 100 }
        pipe_ingest_ip_field: { type: string, maxLength: 100 }
        pipe_api_key_required: { type: boolean }
        pipe_allow_any_browser_origin: { type: boolean }

    Pipe:
      type: object
      description: Row from `pipe` table (snake_case). See GET response for joined data.
      additionalProperties: true

    DestinationColumnInput:
      type: object
      required: [column_name, column_type]
      properties:
        column_name:
          type: string
        column_type:
          type: string
          enum: [string, number, boolean, object, array, "null"]
          description: JSON value type before SQL cast (RFC 8259 types).
        source_field:
          type: string
          description: Top-level key on each ingested JSON object (not nested paths).
        semantic_role:
          type: string
          enum: [dedupe_id, record_ts, received_at]
          description: |
            `dedupe_id` enables upsert on delivery (must match PK/UNIQUE on destination table).
        nullable:
          type: boolean
          default: true

    DestinationDatabaseConfig:
      type: object
      required: [host, database, username, password, table]
      properties:
        host: { type: string }
        port:
          oneOf:
            - type: integer
            - type: string
          description: Default 5432 (Postgres) or 3306 (MySQL) if omitted when coerced server-side.
        database: { type: string }
        username: { type: string }
        password:
          type: string
          description: Stored encrypted at rest. GET responses redact to `[redacted]`.
        table:
          type: string
          description: |
            `table` or `schema.table` (Postgres: quoted identifiers; MySQL two-part = database.table).
        dialect:
          type: string
          description: |
            `postgresql` (default), or aliases `postgres`, `pg`, `mysql`, `mariadb` — see `normalize_dialect`
            in `lib/delivery/database_destination.js`.

    CreateDestinationRequest:
      type: object
      required: [destination_type, destination_config]
      properties:
        destination_type:
          type: string
          enum: [destination_database, destination_object_store, destination_http_endpoint]
        destination_max_retry_seconds:
          type: integer
          minimum: 1
          description: Default `86400` if omitted.
        destination_config:
          oneOf:
            - $ref: "#/components/schemas/DestinationDatabaseConfig"
            - type: object
              description: Object-store or HTTP shapes (not fully expanded in this bundle).
        columns:
          type: array
          items: { $ref: "#/components/schemas/DestinationColumnInput" }
          description: Required for `destination_database` when mappings are supplied; at least one non-empty column.

    UpdateDestinationRequest:
      type: object
      properties:
        destination_max_retry_seconds:
          type: integer
          minimum: 1
        destination_config:
          type: object
          additionalProperties: true
          description: |
            Shallow-merged over existing JSON. For `destination_database`, **empty string** `password`
            keeps the previous password. Same for object-store `secret_key` and HTTP `auth_header`.
        columns:
          type: array
          items: { $ref: "#/components/schemas/DestinationColumnInput" }
          description: If present, **replaces** all column mappings for DB destinations (delete + insert server-side).

    SetDestinationStatusRequest:
      type: object
      required: [destination_status]
      properties:
        destination_status:
          type: string
          enum: [active, blocked]

    SetPipeLimitsRequest:
      type: object
      required:
        - pl_ingest_max_records_per_request
        - pl_ingest_max_records_per_second
        - pl_ingest_max_records_per_day
        - pl_delivery_max_records_per_flush
        - pl_delivery_max_bytes_per_flush
        - pl_delivery_max_seconds_per_flush
      properties:
        pl_ingest_max_records_per_request: { type: integer, minimum: 1 }
        pl_ingest_max_records_per_second: { type: integer, minimum: 1 }
        pl_ingest_max_records_per_day: { type: integer, minimum: 1 }
        pl_ingest_max_bytes_per_day:
          type: integer
          minimum: 1
          nullable: true
          description: Omit or null for no per-day byte cap at ingest.
        pl_delivery_max_records_per_flush: { type: integer, minimum: 1 }
        pl_delivery_max_bytes_per_flush: { type: integer, minimum: 1 }
        pl_delivery_max_seconds_per_flush: { type: integer, minimum: 1 }

    AddAllowedOriginRequest:
      type: object
      required: [origin_domain]
      properties:
        origin_domain:
          type: string
          pattern: '^https?://[a-zA-Z0-9.-]+(:[0-9]+)?$'
          example: "https://app.example.com"

    CreateApiKeyRequest:
      type: object
      required: [api_key_name]
      properties:
        api_key_name:
          type: string
          minLength: 1
          maxLength: 100

paths:
  /v1/pipes:
    get:
      tags: [Pipes]
      summary: List pipes for workspace
      operationId: listPipes
      responses:
        "200":
          description: OK
          content:
            application/json:
              schema:
                type: object
                properties:
                  pipes:
                    type: array
                    items: { $ref: "#/components/schemas/Pipe" }
                  usage_by_pipe: { type: object, additionalProperties: true }
                  usage_stats_window_hours: { type: integer }

    post:
      tags: [Pipes]
      summary: Create pipe
      operationId: createPipe
      requestBody:
        required: true
        content:
          application/json:
            schema: { $ref: "#/components/schemas/CreatePipeRequest" }
            examples:
              websiteAnalytics:
                summary: Website analytics friendly defaults
                value:
                  pipe_name: "site_events"
                  pipe_description: "UserRadar → customer warehouse"
                  pipe_add_ingest_ts: true
                  pipe_add_ingest_ip: true
                  pipe_ingest_ts_field: "received_at"
                  pipe_api_key_required: true
      responses:
        "201":
          description: Created
          content:
            application/json:
              schema:
                type: object
                required: [pipe_id]
                properties:
                  pipe_id: { type: string, format: uuid }
        "400":
          description: validation_error
          content:
            application/json:
              schema: { $ref: "#/components/schemas/ValidationError" }
        "409":
          description: duplicate_pipe_name
          content:
            application/json:
              schema:
                type: object
                properties:
                  error: { type: string, example: duplicate_pipe_name }
                  message: { type: string }

  /v1/pipes/{pipe_id}:
    parameters:
      - name: pipe_id
        in: path
        required: true
        schema: { type: string, format: uuid }
    get:
      tags: [Pipes]
      summary: Get pipe with destinations, limits, workspace API key metadata
      operationId: getPipe
      responses:
        "200":
          description: OK — `destinations[].destination_config.password` (DB) / `secret_key` (S3) / `auth_header` (HTTP) are `[redacted]` in JSON.
          content:
            application/json:
              schema:
                type: object
                additionalProperties: true
        "404":
          description: pipe_not_found
    patch:
      tags: [Pipes]
      summary: Update pipe
      operationId: updatePipe
      requestBody:
        required: true
        content:
          application/json:
            schema: { $ref: "#/components/schemas/UpdatePipeRequest" }
      responses:
        "200":
          description: OK
          content:
            application/json:
              schema:
                type: object
                properties:
                  pipe: { $ref: "#/components/schemas/Pipe" }
        "400":
          content:
            application/json:
              schema: { $ref: "#/components/schemas/ValidationError" }
    delete:
      tags: [Pipes]
      summary: Delete pipe
      operationId: deletePipe
      responses:
        "204":
          description: No content

  /v1/pipes/{pipe_id}/destinations:
    parameters:
      - $ref: "#/components/parameters/PipeId"
    post:
      tags: [Destinations]
      summary: Create destination
      operationId: createDestination
      requestBody:
        required: true
        content:
          application/json:
            schema: { $ref: "#/components/schemas/CreateDestinationRequest" }
            examples:
              postgresWarehouse:
                summary: PostgreSQL warehouse
                value:
                  destination_type: destination_database
                  destination_config:
                    host: "db.customer.example"
                    port: 5432
                    database: "analytics"
                    username: "batchpipe_writer"
                    password: "use-strong-secret"
                    table: "public.page_event"
                    dialect: "postgresql"
                  columns:
                    - column_name: "event_id"
                      column_type: "string"
                      source_field: "event_id"
                      semantic_role: "dedupe_id"
                      nullable: false
                    - column_name: "payload"
                      column_type: "object"
                      source_field: "payload"
      responses:
        "201":
          content:
            application/json:
              schema:
                type: object
                properties:
                  destination_id: { type: string, format: uuid }
        "400":
          content:
            application/json:
              schema: { $ref: "#/components/schemas/ValidationError" }

  /v1/pipes/{pipe_id}/destinations/{destination_id}:
    parameters:
      - $ref: "#/components/parameters/PipeId"
      - $ref: "#/components/parameters/DestinationId"
    patch:
      tags: [Destinations]
      summary: Update destination (merge config; empty secrets keep prior value)
      operationId: updateDestination
      requestBody:
        content:
          application/json:
            schema: { $ref: "#/components/schemas/UpdateDestinationRequest" }
      responses:
        "200":
          description: Returns redacted destination
          content:
            application/json:
              schema:
                type: object
                properties:
                  destination:
                    type: object
                    additionalProperties: true
        "404":
          description: destination_not_found
    delete:
      tags: [Destinations]
      summary: Delete destination
      operationId: deleteDestination
      responses:
        "204": { description: No content }
        "404": { description: destination_not_found }

  /v1/pipes/{pipe_id}/destinations/{destination_id}/status:
    parameters:
      - $ref: "#/components/parameters/PipeId"
      - $ref: "#/components/parameters/DestinationId"
    patch:
      tags: [Destinations]
      summary: Pause or resume destination delivery
      operationId: setDestinationStatus
      requestBody:
        required: true
        content:
          application/json:
            schema: { $ref: "#/components/schemas/SetDestinationStatusRequest" }
      responses:
        "200":
          content:
            application/json:
              schema:
                type: object
                properties:
                  destination_status: { type: string, enum: [active, blocked] }

  /v1/pipes/{pipe_id}/limits:
    parameters:
      - $ref: "#/components/parameters/PipeId"
    put:
      tags: [Limits]
      summary: Upsert pipe limits
      operationId: setPipeLimits
      requestBody:
        required: true
        content:
          application/json:
            schema: { $ref: "#/components/schemas/SetPipeLimitsRequest" }
      responses:
        "200":
          content:
            application/json:
              schema:
                type: object
                properties:
                  limits: { type: object, additionalProperties: true }

  /v1/pipes/{pipe_id}/allowed-origins:
    parameters:
      - $ref: "#/components/parameters/PipeId"
    post:
      tags: [AllowedOrigins]
      summary: Add CORS allowed origin
      operationId: addAllowedOrigin
      requestBody:
        required: true
        content:
          application/json:
            schema: { $ref: "#/components/schemas/AddAllowedOriginRequest" }
      responses:
        "201":
          content:
            application/json:
              schema:
                type: object
                properties:
                  pipe_allowed_origin_id: { type: string, format: uuid }
        "409":
          description: duplicate_origin

  /v1/pipes/{pipe_id}/allowed-origins/{origin_id}:
    parameters:
      - $ref: "#/components/parameters/PipeId"
      - name: origin_id
        in: path
        required: true
        schema: { type: string, format: uuid }
    delete:
      tags: [AllowedOrigins]
      summary: Remove allowed origin
      operationId: deleteAllowedOrigin
      responses:
        "204": { description: No content }
        "404": { description: origin_not_found }

  /v1/api-keys:
    get:
      tags: [ApiKeys]
      summary: List API keys (metadata; never returns full secret)
      operationId: listApiKeys
      responses:
        "200":
          content:
            application/json:
              schema:
                type: object
                properties:
                  api_keys:
                    type: array
                    items:
                      type: object
                      properties:
                        api_key_id: { type: string, format: uuid }
                        api_key_name: { type: string }
                        api_key_prefix: { type: string }
                        api_key_status: { type: string }
                        api_key_created_ts: { type: string, format: date-time }
    post:
      tags: [ApiKeys]
      summary: Create API key (full key returned once)
      operationId: createApiKey
      requestBody:
        required: true
        content:
          application/json:
            schema: { $ref: "#/components/schemas/CreateApiKeyRequest" }
      responses:
        "201":
          content:
            application/json:
              schema:
                type: object
                properties:
                  api_key_id: { type: string, format: uuid }
                  api_key: { type: string }
                  api_key_name: { type: string }
        "409":
          description: duplicate_api_key_name

  /v1/api-keys/{api_key_id}:
    parameters:
      - name: api_key_id
        in: path
        required: true
        schema: { type: string, format: uuid }
    delete:
      tags: [ApiKeys]
      summary: Revoke API key
      operationId: revokeApiKey
      responses:
        "204": { description: No content }
        "400":
          description: cannot_revoke_current_key
        "404":
          description: api_key_not_found
