Data Warehouse Sync

You can sync all of your historical and future data from Customer.io to your Data Warehouse using our Data Warehouse Sync feature. For a limited, future-only set of data, you can use Reporting Webhooks and/or a product like Stitch.

The Data Warehouse Sync feature exports individual parquet files to an Amazon S3 or Google Cloud Storage bucket that you host. We export parquet files for Deliveries, Metrics, Subjects, Outputs, People, and Attributes. Each file contains rows representing data that changed since the last export.

Once the parquet files are in your S3 or GCS bucket, you can import them into data platforms like Fivetran or data warehouses like Redshift, BigQuery, and Snowflake.

We attempt to sync every 15 minutes, though actual sync intervals and processing times may vary. When syncing large data sets, or Customer.io experiences a high volume of concurrent sync operations, it can take up to several hours to process and export data. This feature is not intended to sync data in real time.

During the first sync, you’ll receive a history of your Deliveries, Metrics, Subjects, and Outputs data. However, People who have been deleted or suppressed before the first sync are not included in the People file export and the historical data in the other export files is anonymized for the deleted and suppressed People.

Set up a Database Integration

Data warehouse sync publishes data to a bucket that you can then ingest into your data warehouse. In the Integrations > Databases catalog, you can select Amazon Redshift, Snowflake, Google BigQuery, etc, but the setup in Customer.io is the same: you point your workspace to the Amazon S3 or Google Cloud Storage (GCS) bucket you want to export parquet files to.

You must set your data warehouse to ingest this data. There are many approaches to ingesting data, but it typically requires a COPY command to load the parquet files from your bucket. After you load parquet files, you should set them to expire so they are automatically deleted.

Snowflake integration example
Snowflake integration example

Set up Data Warehouse Sync with Google Cloud Storage

Before you begin, make sure that you’re prepared to ingest relevant parquet files from Customer.io.

You must set up a service account key in JSON that grants read/write permissions to your GCS bucket. You’ll provide the contents of this key to Customer.io when setting up your integration.

  1. Log in to Customer.io.

  2. Go to Integrations and select your database integration with Google Cloud Storage as your Sync Bucket, or select Google Cloud Storage as a direct integration.

  3. Enter information about your GCS bucket and click Validate & select data.

    • Enter Name of your GCS bucket.
    • Enter the Path to your GCS bucket.
    • Paste the JSON of your Service Account Key.
    dws-gcs-bucket.png
    dws-gcs-bucket.png
  4. Select the data that you want to export from Customer.io to your bucket. By default, we export all data, but you can disable the types that you aren’t interested in.

  5. Click Create and sync data.

Set up Data Warehouse Sync with Amazon S3 or Yandex

Before you begin, make sure that you’re prepared to ingest relevant parquet files from Customer.io.

  1. Create an Access Key and a Service Key with read/write permissions to your S3 or Yandex bucket.

  2. Log in to Customer.io.

  3. Go to Integrations, and select your database integration with Amazon S3 as your Sync Bucket. You can also select Yandex or Amazon S3 as a direct integration.

  4. Enter information about your bucket and click Select data.

    • Enter the Name of your bucket.
    • Enter the path to your bucket.
    • Paste your Access and Secret keys in the appropriate fields.
    • Select the Region your bucket is in.
    dws-s3-setup.png
    dws-s3-setup.png
  5. Select the data types that you want to export from Customer.io to your bucket. By default, we export all data types, but you can disable the types that you aren’t interested in.

  6. Click Create and sync data.

Initial Sync

Your initial sync is a set of files containing historical data to represent your workspace’s current state. Subsequent sync files contain changesets.

  • Attributes: The initial Attributes exports include a list of profiles and their current attributes. Subsequent files will only contain attribute changes, with one change per row.

Turn data types on or off

You can turn off files you no longer want to receive, or pause them momentarily as you update your integration, and turn them back on. When you turn a file schema on, we send files to catch you up from the last export. If you haven’t exported a particular file before—the file was never “on”—the initial sync contains your historical data.

 Turning the People file off

If you turn the People file off, and leave it off for more than 7 days, you will not be able to re-enable it. Contact win@customer.io to handle a pause in People syncs longer than 7 days.
  1. Go to Settings > Integrations and select your database integration.
  2. Select the files you want to turn on or off.

When you enable a file, the next sync will contain baseline historical data; subsequent syncs will contain changesets.

Turn files on or off
Turn files on or off

Manage your Data Warehouse configuration

You can change settings for a bucket, if your path changes or you need to swap keys for security purposes.

  1. Go to Settings > Integrations and select your database integration.
  2. Click Manage Configuration for your bucket.
  3. Make your changes. No matter your changes, you must input your Service Account Key (GCS) or Secret Key (S3, Yandex) again.
  4. Click Update Configuration.

Subsequent syncs will use your new configuration.

Edit your data warehouse bucket configuration
Edit your data warehouse bucket configuration

Update data warehouse sync version

Contact win@customer.io to update your data warehouse sync version and take advantage of schema and feature updates.

Before you prepare to update your data warehouse sync version, see the changelog below. You may need to update your ingestion logic to support updated schemas, filenames, and new data types.

 When updating from v1 to a later version, you must:

  • Update ingestion logic ot accept the new file name format: <name>_v<x>_<workspace_id>_<sequence>.parquet
  • Delete existing rows in your Subjects and Outputs tables. When you update, we send all of your Subjects and Outputs data from the beginning of your history using the new file schema.

Deliveries schema

Deliveries are individual Email, push, SMS, slack, and webhook records sent from your workspace. The first deliveries export file includes baseline historical data. Subsequent files contain rows for data that changed since the last export.

Field Name Primary Key Foreign Key Description
workspace_id INTEGER (Required). The ID of the Customer.io workspace associated with the delivery record.
delivery_id STRING (Required). The ID of the delivery record.
internal_customer_id People STRING (Nullable). The ID of the person in question. Use the people parquet file to resolve this ID to an external customer_id or email address.
subject_id Subjects STRING (Nullable). If the delivery was created as part of a Campaign or API Triggered Broadcast workflow, this is the ID for the path the person went through in the workflow.
event_id Metrics STRING (Nullable). If the delivery was created as part of an event-triggered Campaign, this is the ID for the unique event that triggered the workflow.
delivery_type STRING (Required). The type of delivery: email, push, sms, slack, or webhook.
campaign_id INTEGER (Nullable). If the delivery was created as part of a Campaign or API Triggered Broadcast workflow, this is the ID for the Campaign or API Triggered Broadcast.
action_id INTEGER (Nullable). If the delivery was created as part of a Campaign or API Triggered Broadcast workflow, this is the ID for the unique workflow item that caused the delivery to be created.
newsletter_id INTEGER (Nullable). If the delivery was created as part of a Newsletter, this is the unique ID of that Newsletter.
content_id INTEGER (Nullable). If the delivery was created as part of a Newsletter split test, this is the unique ID of the Newsletter variant.
trigger_id INTEGER (Nullable). If the delivery was created as part of an API Triggered Broadcast, this is the unique trigger ID associated with the API call that triggered the broadcast.
created_at TIMESTAMP (Required). The timestamp the delivery was created at.
transactional_message_id INTEGER (Nullable). If the delivery occured as a part of a transactional message, this is the unique identifier for the API call that triggered the message.

Metrics schema

Metrics exports detail events relating to deliveries (e.g. messages sent, opened, etc).

Subsequent files contain rows for data that changed since the last export.

Field Name Primary Key Foreign Key Description
event_id STRING (Required). The unique ID of the metric event. This can be useful for deduplicating purposes.
workspace_id INTEGER (Required). The ID of the Customer.io workspace associated with the metric record.
delivery_id Deliveries STRING (Required). The ID of the delivery record.
metric STRING (Required). The type of metric (e.g. sent, delivered, opened, clicked).
reason STRING (Nullable). For certain metrics (e.g. attempted), the reason behind the action.
link_id INTEGER (Nullable). For "clicked" metrics, the unique ID of the link being clicked.
link_url STRING (Nullable). For "clicked" metrics, the URL of the clicked link. (Truncated to 1000 bytes.)
created_at TIMESTAMP (Required). The timestamp the metric was created at.

Subjects schema

Subjects are the unique workflow journeys that people take through Campaigns and API Triggered Broadcasts. The first subjects export file includes baseline historical data. Subsequent files contain rows for data that changed since the last export.

 Subjects are not available in v1

Upgrade to v2 to get subjects data.

Field Name Primary Key Foreign Key Description
workspace_id INTEGER (Required). The ID of the Customer.io workspace associated with the subject record.
subject_id STRING (Required). The ID for the path a person went through in a Campaign or API Triggered Broadcast workflow.
subject_name STRING (Required). A secondary unique ID for the path a person took through a campaign or broadcast workflow.
internal_customer_id People STRING (Nullable). The ID of the person in question. Use the people parquet file to resolve this ID to an external customer_id or email address.
campaign_type STRING (Required). The type of Campaign (segment, event, or triggered_broadcast)
campaign_id INTEGER (Required). The ID of the Campaign or API Triggered Broadcast.
event_id Metrics STRING (Required). The ID for the unique event that triggered the workflow.
trigger_id INTEGER (Required). If the delivery was created as part of an API Triggered Broadcast, this is the unique trigger ID associated with the API call that triggered the broadcast.
started_campaign_at TIMESTAMP (Required). The timestamp when the person first matched the campaign trigger. For event-triggered campaigns, this is the timestamp of the trigger event. For segment-triggered campaigns, this is the time the user entered the segment.
created_at TIMESTAMP (Required). The timestamp the subject was created at.

Outputs schema

Outputs are the unique steps within each workflow journey. The first outputs file includes historical data. Subsequent files contain rows for data that changed since the last export.

 Outputs are not available in v1

Upgrade to v2 to get outputs data.
Field Name Primary Key Foreign Key Description
workspace_id INTEGER (Required). The ID of the Customer.io workspace associated with the output record.
output_id STRING (Required). The ID for the step of the unique path a person went through in a Campaign or API Triggered Broadcast workflow.
subject_id Subjects STRING (Required). The ID for the path a person went through in a Campaign or API Triggered Broadcast workflow.
output_type STRING (Required). The type of step a person went through in a Campaign or API Triggered Broadcast workflow. Note that the "delay" output_type covers many use cases: a Time Delay or Time Window workflow item, a "grace period", or a Date-based campaign trigger.
action_id INTEGER (Required). The ID for the unique workflow item associated with the output.
explanation STRING (Required). The explanation for the output.
delivery_id Deliveries STRING (Nullable). If a delivery resulted from this step of the workflow, this is the ID of that delivery.
draft BOOLEAN (Nullable). If a delivery resulted from this step of the workflow, this indicates whether the delivery was created as a draft.
link_tracked BOOLEAN (Nullable). If a delivery resulted from this step of the workflow, this indicates whether links within the delivery are configured for tracking.
split_test_index INTEGER (Nullable). If the step of the workflow was a Split Test, this indicates the variant of the Split Test.
delay_ends_at TIMESTAMP (Nullable). If the step of the workflow involves a delay, this is the timestamp for when the delay will end.
branch_index INTEGER (Nullable). If the step of the workflow was a T/F Branch, a Multi-Split Branch, or a Random Cohort Branch, this indicates the branch that was followed.
manual_segment_id INTEGER (Nullable). If the step of the workflow was a Manual Segment Update, this is the ID of the Manual Segment involved.
add_to_manual_segment BOOLEAN (Nullable). If the step of the workflow was a Manual Segment Update, this indicates whether a person was added or removed from the Manual Segment involved.
created_at TIMESTAMP (Required). The timestamp the output was created at.

People schema

The first People export file includes a list of current People at the time of first sync (deleted or suppressed people will not be included in the first file). Subsequent exports will include any People deleted or suppressed since the last export.

Field Name Primary Key Foreign Key Description
workspace_id INTEGER (Required). The ID of the Customer.io workspace associated with the person.
customer_id STRING (Required). The ID of the person in question. This will match the ID you see in the Customer.io UI.
internal_customer_id STRING (Required). The internal ID of the person in question.
deleted BOOLEAN (Nullable). This indicates whether the person has been deleted.
suppressed BOOLEAN (Nullable). This indicates whether the person has been suppressed.
created_at TIMESTAMP (Required). The timestamp the profile was created at. Note that this is the timestamp the profile was created in Customer.io's database, which may not match the created_at profile attribute.

Attributes schema

Attribute exports represent changes to people (by way of their attribute values) over time. The initial Attributes export includes a list of profiles and their current attributes. Subsequent files contain attribute changes, with one change per row.

Field Name Primary Key Foreign Key Description
workspace_id INTEGER (Required). The ID of the Customer.io workspace associated with the person.
internal_customer_id STRING (Required). The internal ID of the person in question. Use the people parquet file to resolve this ID to an external customer_id or email address.
attribute_name STRING (Required). The attribute that was updated.
attribute_value STRING (Required). The new value of the attribute.
timestamp TIMESTAMP (Required). The timestamp of the attribute update.

Data warehouse sync changelog

v2

Data warehouse sync v2 includes the following changes from v1:

  • Support for transactional messages (as transactional_message_id) in the Deliveries schema.
  • A fix for an issue that caused missing rows in Subjects and Outputs data. As a result of this bug, data warehouse v1 no longer supports Subjects or Outputs data.

If you used our initial data warehouse release, we recommend updating to the v2 implementation. However, you can continue using our original data warehouse sync feature if you don’t use:

  • Subjects and/or Outputs data
  • Transactional messages.

FAQ

1. How are exported parquet files organized?

Each parquet file is named <name>_v<x>_<workspace_id>_<sequence>.

  • <name> is either deliveries, metrics, subjects, outputs, or people.
  • v Indicates the schema version. The current version is v2; v1 schemas do not have a version indicator and are deprecated.
  • <workspace_id> refers to the Customer.io workspace whose data is included.
  • <sequence> is an ever-increasing value over time.

 Initial metrics sync file names

Your initial metrics sync is broken up into files that indicate a starting and ending sequence, to help you order things appropriately, for example: <name>_v<x>_<workspace_id>_<sequence_pt1>_<sequence_pt2>

2. How do you handle export failures?

If we experience an internal failure, we monitor and repair it promptly. If there’s an external failure such as a networking problem, we retry the export. If there’s an issue with bucket access, we’ll reach out to you with details.

In each retry case, the next export will contain all data since the prior export. No data will be lost.

3. How should I import data from my S3 or GCS bucket to my data warehouse?

There are many approaches to this, but here’s one example we’ve seen work for moving data from a GCS bucket to BigQuery.

  • Implement a Cloud Function to automatically import the parquet files from your GCS bucket to a BigQuery table.
  • Set an expiration on the parquet files so they get automatically deleted.

Here’s a screenshot of an example Cloud Function. Click here to download sample code for this Cloud Function.

BigQuery Cloud Function
BigQuery Cloud Function

4. How can I get information about a campaign, workflow action, or message that’s not included in the export?

With the data in our export, it’s possible to query our API to pull any extra information you need.

For example, calling https://beta-api.customer.io/v1/api/campaigns/:id with the id of the campaign from a subject record will give you details about the campaign.

Check out our full API docs here.

5. How do I get all of the current attributes for a profile? On setup, the initial Attributes export will include a list of profiles and their current attributes. Subsequent files will only contain attribute changes, with one change per row. In order to get the most recent attributes for a particular profile, you’ll need to use the timestamp value to query for the latest.

An example query for Snowflake would be:

select internal_customer_id, attribute_name, attribute_value 
from (select internal_customer_id, attribute_name, attribute_value, row_number() over (partition by internal_customer_id,attribute_name order by timestamp desc) RNO from attributes) 
where rno=1 and internal_customer_id='xxxxxxxxxxx';