Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.mantrixflow.com/llms.txt

Use this file to discover all available pages before exploring further.

A Postgres-to-Postgres (P2P) pipeline reads from one PostgreSQL source and writes the result into a PostgreSQL destination. Both databases can be on any managed provider. P2P pipelines support a 3-stage transformation pipeline inside the Destination panel:
Source (public.tablename)

Normalisation   ← rename/exclude columns (Normalisation tab)

Raw layer  (raw.public__tablename)

dbt Layer       ← SQL model using {{ source('raw', 'public__tablename') }} (dbt Layer tab)

Destination  (analytics.your_model)
For simple pipelines you can use just the SQL Transform node on the canvas with {{ source }}. For production pipelines with multiple tables, JSONB extraction, or derived metrics, use the Normalisation tab + dbt Layer tab instead.

When to use this

  • Mirror a production Neon or RDS database into a reporting schema on Supabase or Aurora
  • Consolidate multiple application databases into a single analytics PostgreSQL instance
  • Promote data from a staging schema to a clean destination schema with type widening
  • Build a separate analytics replica without touching the production database

Prerequisites

  • A saved Source connection for the PostgreSQL database you read from
  • A saved Destination connection for the PostgreSQL database you write to
  • Source user: CONNECT, USAGE on the schema, SELECT on the tables
  • Destination user: CONNECT, USAGE, INSERT, UPDATE, CREATE on the destination schema
  • Both databases reachable from the MantrixFlow ELT server (public endpoint or IP allowlist)
See PostgreSQL source and PostgreSQL destination for provider-by-provider setup.

Setting up the source connection

1

Open Connections and create a source

Go to Connections → + New Connection. Set the role to Source and choose PostgreSQL.
2

Fill in the connection fields

Enter host, port (5432), database, username, password, schema (public), and SSL mode. Use require for Neon, Supabase, and AWS RDS.
3

Test and save

Click Test Connection. Save only after the test passes.

Supported providers

ProviderSetup note
NeonUse the direct connection string. Add MantrixFlow’s IP under IP Allow on paid plans.
AWS RDS / AuroraAdd MantrixFlow’s /32 to the security group inbound rule for port 5432.
SupabaseUse the host from Connection Settings. Add MantrixFlow’s IP under Network Restrictions.
GCP Cloud SQLEnable public IP and add MantrixFlow’s /32 as an authorized network.
Azure Database for PostgreSQLAdd a firewall rule with start and end IP both set to MantrixFlow’s static IP.
AivenAdd MantrixFlow’s /32 to the service IP filter.
DigitalOcean Managed PostgreSQLAdd MantrixFlow’s IP as a trusted source on the cluster.
Self-hostedRequires a public IP or port-forward reachable from MantrixFlow.

Setting up the destination connection

1

Create a destination connection

Go to Connections → + New Connection. Set the role to Destination and choose PostgreSQL.
2

Fill in the fields

Same fields as the source form. The schema here is the default — you can override it per-pipeline in the builder.
3

Test and save

Click Test Connection and save after the test passes.

Creating the pipeline

1

Create the pipeline shell

Go to Data Pipelines → + New Pipeline. Enter a name, select the PostgreSQL source connection, and click Create & open canvas.
2

Configure the Source node

On the canvas, click ⚙️ on the Source node. In the panel that opens:
  • Click Discover schema to load available tables.
  • Tick Include next to each table to sync.
  • Click Preview on a table to verify raw rows.
3

Add a SQL transform (optional)

For lightweight pre-processing, add a Transform node on the canvas and write a SELECT using {{ source }}:
SELECT
  id,
  first_name || ' ' || last_name AS full_name,
  email,
  created_at::date               AS signup_date,
  CAST(revenue_cents AS NUMERIC) / 100.0 AS revenue_usd
FROM {{ source }}
WHERE active = true
Skip this step if you are using the Normalisation + dbt Layer approach.
4

Configure Normalisation (optional)

Click ⚙️ on the Destination node and open the Normalisation tab. For each source column:
  • Rename — enter the column name you want in the raw layer (e.g. customer_idcustomer_uuid)
  • Exclude — tick to drop the column entirely (e.g. auto-generated serial_col, or PII you do not want in the warehouse)
Columns with no rule pass through under their original name.See Normalisation for the full rule reference.
5

Write the dbt Layer model (optional)

Click the dbt Layer tab in the Destination panel. Write a dbt SQL model referencing the normalised raw table:
SELECT
    customer_uuid,
    email_address,
    fname,
    lname,
    TRIM(CONCAT(COALESCE(fname,''), ' ', COALESCE(lname,''))) AS full_name,
    birth_date,
    EXTRACT(YEAR FROM AGE(CURRENT_DATE, birth_date)) AS age,
    metadata->>'city'    AS city,
    metadata->>'country' AS country,
    is_active,
    created_at::DATE     AS registered_date
FROM {{ source('raw', 'public__customers') }}
The source reference is {{ source('raw', 'schema__tablename') }}not {{ source }}. Column names here are the renamed names from the Normalisation tab.See dbt Layer for more examples.
6

Configure the Destination node

Click ⚙️ on the Destination node. In the Config tab:
  • Connection — select the PostgreSQL destination connection
  • Final delivery schema — e.g. analytics
  • Sync modeFULL_TABLE for all first runs
  • Write mode — select Upsert
  • Click Validate config.
Use the Preview tab to inspect the target table before running.
7

Run and verify

Click ▷ on the Destination node or the Run button in the top bar. Check run results via the history icon in the top bar.
8

Schedule (after first run succeeds)

Open the Scheduling tab in the Destination panel and set your interval.

Write modes

ModeStatusBehavior
UpsertAvailableInserts new rows; updates existing rows matched by primary key
AppendComing soonInsert only — no deduplication
ReplaceComing soonDrops and recreates the destination table on each run
Upsert requires a primary key on the destination table. If one is missing, add it before the first run:
ALTER TABLE analytics.your_table ADD PRIMARY KEY (id);

SQL transform examples

These examples cover the most common SQL patterns for P2P pipelines. All use {{ source }} as the table reference; MantrixFlow replaces it with the qualified stream name at run time. None of them are tied to a specific provider.

1. Copy a table as-is

SELECT *
FROM {{ source }}

2. Select specific columns

SELECT
  id,
  email,
  first_name,
  last_name,
  created_at,
  updated_at
FROM {{ source }}

3. Rename columns

SELECT
  id,
  usr_email         AS email,
  usr_created_ts    AS created_at,
  usr_updated_ts    AS updated_at,
  acct_status       AS status
FROM {{ source }}

4. Cast types explicitly

SELECT
  id,
  smallint_col::INTEGER       AS int_val,
  integer_col::BIGINT         AS big_val,
  real_col::DOUBLE PRECISION  AS float_val,
  numeric_col::NUMERIC(18,4)  AS precise_val,
  char_col::TEXT              AS char_text,
  varchar_col::TEXT           AS varchar_text,
  timestamp_col::TIMESTAMPTZ  AS ts_utc,
  date_col::TIMESTAMP         AS date_as_ts
FROM {{ source }}

5. Filter rows before loading

SELECT *
FROM {{ source }}
WHERE is_deleted = false
  AND status IN ('active', 'pending')
  AND created_at >= '2024-01-01'

6. Compute derived columns

SELECT
  id,
  first_name || ' ' || last_name  AS full_name,
  UPPER(email)                    AS email_upper,
  EXTRACT(YEAR FROM created_at)   AS signup_year,
  CURRENT_TIMESTAMP               AS loaded_at,
  CASE
    WHEN score >= 90 THEN 'high'
    WHEN score >= 60 THEN 'medium'
    ELSE 'low'
  END                             AS score_tier
FROM {{ source }}

7. Convert cents to currency

SELECT
  id,
  order_number,
  CAST(amount_cents AS NUMERIC) / 100.0  AS amount_usd,
  CAST(tax_cents    AS NUMERIC) / 100.0  AS tax_usd,
  CAST(total_cents  AS NUMERIC) / 100.0  AS total_usd,
  currency,
  created_at
FROM {{ source }}

8. Flatten a JSONB column

SELECT
  id,
  payload->>'event_type'              AS event_type,
  (payload->>'amount')::NUMERIC       AS amount,
  (payload->>'user_id')::UUID         AS user_id,
  (payload->>'timestamp')::TIMESTAMPTZ AS event_at,
  payload->'metadata'                 AS metadata_jsonb
FROM {{ source }}

9. Aggregate before writing

SELECT
  customer_id,
  COUNT(*)          AS order_count,
  SUM(total_amount) AS lifetime_value,
  AVG(total_amount) AS avg_order_value,
  MIN(created_at)   AS first_order_at,
  MAX(created_at)   AS last_order_at
FROM {{ source }}
GROUP BY customer_id

10. Pivot status into boolean flags

SELECT
  id,
  user_id,
  status,
  status = 'active'  AS is_active,
  status = 'churned' AS is_churned,
  status = 'trial'   AS is_trial,
  created_at
FROM {{ source }}

11. Cast range types to TEXT

PostgreSQL range types must be cast explicitly — they are not mapped natively by the extraction layer:
SELECT
  id,
  int4range_col::TEXT   AS int_range,
  int8range_col::TEXT   AS bigint_range,
  numrange_col::TEXT    AS num_range,
  tsrange_col::TEXT     AS ts_range,
  tstzrange_col::TEXT   AS tstz_range,
  daterange_col::TEXT   AS date_range
FROM {{ source }}

12. Cast geometric types to TEXT

SELECT
  id,
  point_col::TEXT   AS point,
  polygon_col::TEXT AS polygon,
  circle_col::TEXT  AS circle,
  box_col::TEXT     AS box
FROM {{ source }}

13. Extract from arrays

-- Land as JSONB
SELECT
  id,
  to_jsonb(int_array_col)  AS int_array_json,
  to_jsonb(text_array_col) AS text_array_json
FROM {{ source }}
-- Unnest into one row per element
SELECT
  id,
  unnest(text_array_col) AS tag
FROM {{ source }}

14. NULL-safe coalesce

SELECT
  id,
  COALESCE(preferred_name, first_name, 'Unknown')  AS display_name,
  COALESCE(mobile_phone, home_phone, work_phone)   AS contact_phone,
  COALESCE(updated_at, created_at)                 AS last_seen_at
FROM {{ source }}

15. Full cross-type widening in one transform

SELECT
  id,
  smallint_col::BIGINT         AS widened_int,
  real_col::DOUBLE PRECISION   AS widened_float,
  char_col::TEXT               AS widened_char,
  varchar_col::TEXT            AS widened_varchar,
  date_col::TIMESTAMPTZ        AS widened_date,
  timestamp_col::TIMESTAMPTZ   AS widened_ts,
  inet_col::TEXT               AS inet_text,
  uuid_col::TEXT               AS uuid_text,
  bool_col::TEXT               AS bool_text,
  jsonb_col::TEXT              AS json_text
FROM {{ source }}
See Data type compatibility for the full widening rules table.

Scheduling

Open the Scheduling tab in the Destination panel (⚙️ on the Destination node → Scheduling tab). Choose a cron or fixed interval such as every 1 hour, every 6 hours, or daily at 02:00 UTC. Always validate data correctness with a manual run before enabling a schedule.

Running multiple destinations from the same source

Create two pipelines pointing to the same source connection but different destinations. MantrixFlow serialises runs for the same source to avoid contention — the second run starts automatically after the first completes.

Supported data types

All standard PostgreSQL types are supported. Complex types without a native destination representation (geometric, range, text search, composite, enum) land as TEXT for maximum cross-provider compatibility. See Data type compatibility for the full mapping table.

Troubleshooting

SymptomLikely causeFix
Connection test times outProvider firewall blocking MantrixFlowAdd MantrixFlow’s /32 to the provider allowlist
No tables discoveredSource user missing USAGE on the schemaGRANT USAGE ON SCHEMA public TO <user>
Permission denied at run timeDestination user missing INSERT or UPDATEGrant write privileges on the destination table
Type error at load timeSource type incompatible with destination columnCheck data type compatibility and update the destination DDL
Duplicate rows on re-runDestination table has no primary keyALTER TABLE analytics.your_table ADD PRIMARY KEY (id)
Range type columns emptyRange types pass through as NULL by defaultCast to TEXT in the transform: int4range_col::TEXT AS int4range_col