Getting Started

Our data warehouse integrations let you send Customer.io data about messages, people, metrics, etc to your data warehouse by way of an Amazon S3 or Google Cloud Project (GCP) storage bucket. From there, you can ingest the data to your warehouse.

You can find our data warehouse integrations by going to Data & Integrations > Integrations and selecting the Databases option. From here, you can select your data warehouse or storage bucket. If your data warehouse appears twice in this list, pick the Data out integration. Check out the specific documentation for your data warehouse for help setting up your integration.

Click databases on the integrations page to find your integration
Click databases on the integrations page to find your integration

How it works

This integration exports individual parquet files for Deliveries, Metrics, Subjects, Outputs, People, and Attributes to your storage bucket. Each parquet file contains data that changed since the last export.

Once the parquet files are in your storage bucket, you can import them into data platforms like Fivetran or data warehouses like Redshift, BigQuery, and Snowflake.

Note that this integration only publishes parquet files to your storage bucket. You must set your data warehouse to ingest this data. There are many approaches to ingesting data, but it typically requires a COPY command to load the parquet files from your bucket. After you load parquet files, you should set them to expire to delete them automatically.

We attempt to export parquet files every 15 minutes, though actual sync intervals and processing times may vary. When syncing large data sets, or Customer.io experiences a high volume of concurrent sync operations, it can take up to several hours to process and export data. This feature is not intended to sync data in real time.

sequenceDiagram participant a as Customer.io participant b as Storage Bucket participant c as Your Data Warehouse loop up to every 15 minutes a->>b: export parquet files b->>c: ingest c->>b: expire/delete files
before next sync end

 Your initial sync includes historical data

During the first sync, you’ll receive a history of your Deliveries, Metrics, Subjects, and Outputs data. However, People who have been deleted or suppressed before the first sync are not included in the People file export and the historical data in the other export files is anonymized for the deleted and suppressed People.

The initial export vs incremental exports

Your initial sync is a set of files containing historical data to represent your workspace’s current state. Subsequent sync files contain changesets.

  • Metrics: The initial metrics sync is broken up into files with two sequence numbers, as follows. <name>_v3_<workspace_id>_<sequence1>_<sequence2>.
  • Attributes: The initial Attributes exports include a list of profiles and their current attributes. Subsequent files will only contain attribute changes, with one change per row.

When you set up your sync, you can choose the parquet files that you want to export to your storage bucket. If you temporarily disable a file, and then turn it back on, the next parquet file will contain the changeset between when you disabled the file and when you enabled it.

flowchart LR a{is it the initial sync?}-->|yes|b[send all history] a-->|no|c{was the file
already enabled?} c-->|yes|d[send changes since last sync] c-->|no|e{was the file
ever enabled?} e-->|yes|f[send changeset since
file was disabled] e-->|no|g[send all history]

How do I get data into my data warehouse?

There are many approaches to ingesting data from your storage bucket, but here’s an example moving data from from a Google Cloud Storage bucket to Google BigQuery.

  • Implement a Cloud Function to automatically import the parquet files from your GCS bucket to a BigQuery table.
  • Set an expiration on the parquet files so they’re automatically deleted.

Below is a screenshot of an example Cloud Function. You can download sample code for this cloud function below here:

main.go
BigQuery Cloud Function
BigQuery Cloud Function

Exported parquet files

This section describes the different kinds of files you can export from our Database-out integrations. Many schemas include an internal_customer_id—this is the cio_idAn identifier for a person that is automatically generated by Customer.io and cannot be changed. This identifier provides a complete, unbroken record of a person across changes to their other identifiers (id, email, etc).. You can use it to resolve a person associated with a subject, delivery, etc.

Deliveries schema

Deliveries are individual Email, push, SMS, slack, and webhook records sent from your workspace. The first deliveries export file includes baseline historical data. Subsequent files contain rows for data that changed since the last export.

Field NamePrimary KeyForeign KeyDescription
workspace_idINTEGER (Required). The ID of the Customer.io workspace associated with the delivery record.
delivery_idSTRING (Required). The ID of the delivery record.
internal_customer_idPeopleSTRING (Nullable). The cio_id of the person in question. Use the people parquet file to resolve this ID to an external customer_id or email address.
subject_idSubjectsSTRING (Nullable). If the delivery was created as part of a Campaign or API Triggered Broadcast workflow, this is the ID for the path the person went through in the workflow.
event_idMetricsSTRING (Nullable). If the delivery was created as part of an event-triggered Campaign, this is the ID for the unique event that triggered the workflow.
delivery_typeSTRING (Required). The type of delivery: email, push, sms, slack, or webhook.
campaign_idINTEGER (Nullable). If the delivery was created as part of a Campaign or API Triggered Broadcast workflow, this is the ID for the Campaign or API Triggered Broadcast.
action_idINTEGER (Nullable). If the delivery was created as part of a Campaign or API Triggered Broadcast workflow, this is the ID for the unique workflow item that caused the delivery to be created.
newsletter_idINTEGER (Nullable). If the delivery was created as part of a Newsletter, this is the unique ID of that Newsletter.
content_idINTEGER (Nullable). If the delivery was created as part of a Newsletter split test, this is the unique ID of the Newsletter variant.
trigger_idINTEGER (Nullable). If the delivery was created as part of an API Triggered Broadcast, this is the unique trigger ID associated with the API call that triggered the broadcast.
created_atTIMESTAMP (Required). The timestamp the delivery was created at.
transactional_message_idINTEGER (Nullable). If the delivery occurred as a part of a transactional message, this is the unique identifier for the API call that triggered the message.
seq_numINTEGER (Required) A monotonically increasing number indicating relative recency for each record: the larger the number, the more recent the record.

Metrics schema

Metrics exports detail events relating to deliveries (e.g. messages sent, opened, etc). Your initial metrics export contains baseline historical data, broken up into files with two sequence numbers, as follows: <name>_v3_<workspace_id>_<sequence1>_sequence2>.

Subsequent files contain rows for data that changed since the last export.

Field NamePrimary KeyForeign KeyDescription
event_idSTRING (Required). The unique ID of the metric event. This can be useful for deduplicating purposes.
workspace_idINTEGER (Required). The ID of the Customer.io workspace associated with the metric record.
delivery_idDeliveriesSTRING (Required). The ID of the delivery record.
metricSTRING (Required). The type of metric (e.g. sent, delivered, opened, clicked).
reasonSTRING (Nullable). For certain metrics (e.g. attempted), the reason behind the action.
link_idINTEGER (Nullable). For "clicked" metrics, the unique ID of the link being clicked.
link_urlSTRING (Nullable). For "clicked" metrics, the URL of the clicked link. (Truncated to 1000 bytes.)
created_atTIMESTAMP (Required). The timestamp the metric was created at.
seq_numINTEGER (Required) A monotonically increasing number indicating relative recency for each record: the larger the number, the more recent the record.

Subjects schema

Subjects are the unique workflow journeys that people take through Campaigns and API Triggered Broadcasts. The first subjects export file includes baseline historical data. Subsequent files contain rows for data that changed since the last export.

 Subjects are not available in v1

Upgrade to v2 or later to get subjects data.

Field NamePrimary KeyForeign KeyDescription
workspace_idINTEGER (Required). The ID of the Customer.io workspace associated with the subject record.
subject_idSTRING (Required). The ID for the path a person went through in a Campaign or API Triggered Broadcast workflow.
subject_nameSTRING (Required). A secondary unique ID for the path a person took through a campaign or broadcast workflow.
internal_customer_idPeopleSTRING (Nullable). The cio_id of the person in question. Use the people parquet file to resolve this ID to an external customer_id or email address.
campaign_typeSTRING (Required). The type of Campaign (segment, event, or triggered_broadcast)
campaign_idINTEGER (Required). The ID of the Campaign or API Triggered Broadcast.
event_idMetricsSTRING (Nullable). The ID for the unique event that triggered the workflow.
trigger_idINTEGER (Optional). If the delivery was created as part of an API Triggered Broadcast, this is the unique trigger ID associated with the API call that triggered the broadcast.
started_campaign_atTIMESTAMP (Required). The timestamp when the person first matched the campaign trigger. For event-triggered campaigns, this is the timestamp of the trigger event. For segment-triggered campaigns, this is the time the user entered the segment.
created_atTIMESTAMP (Required). The timestamp the subject was created at.
seq_numINTEGER (Required) A monotonically increasing number indicating relative recency for each record: the larger the number, the more recent the record.

Outputs schema

Outputs are the unique steps within each workflow journey. The first outputs file includes historical data. Subsequent files contain rows for data that changed since the last export.

 Outputs are not available in v1

Upgrade to v2 or later to get outputs data.

Field NamePrimary KeyForeign KeyDescription
workspace_idINTEGER (Required). The ID of the Customer.io workspace associated with the output record.
output_idSTRING (Required). The ID for the step of the unique path a person went through in a Campaign or API Triggered Broadcast workflow.
subject_idSubjectsSTRING (Required). The ID for the path a person went through in a Campaign or API Triggered Broadcast workflow.
output_typeSTRING (Required). The type of step a person went through in a Campaign or API Triggered Broadcast workflow. Note that the "delay" output_type covers many use cases: a Time Delay or Time Window workflow item, a "grace period", or a Date-based campaign trigger.
action_idINTEGER (Required). The ID for the unique workflow item associated with the output.
explanationSTRING (Required). The explanation for the output.
delivery_idDeliveriesSTRING (Nullable). If a delivery resulted from this step of the workflow, this is the ID of that delivery.
draftBOOLEAN (Nullable). If a delivery resulted from this step of the workflow, this indicates whether the delivery was created as a draft.
link_trackedBOOLEAN (Nullable). If a delivery resulted from this step of the workflow, this indicates whether links within the delivery are configured for tracking.
split_test_indexINTEGER (Nullable). If the step of the workflow was a Split Test, this indicates the variant of the Split Test.
delay_ends_atTIMESTAMP (Nullable). If the step of the workflow involves a delay, this is the timestamp for when the delay will end.
branch_indexINTEGER (Nullable). If the step of the workflow was a T/F Branch, a Multi-Split Branch, or a Random Cohort Branch, this indicates the branch that was followed.
manual_segment_idINTEGER (Nullable). If the step of the workflow was a Manual Segment Update, this is the ID of the Manual Segment involved.
add_to_manual_segmentBOOLEAN (Nullable). If the step of the workflow was a Manual Segment Update, this indicates whether a person was added or removed from the Manual Segment involved.
created_atTIMESTAMP (Required). The timestamp the output was created at.
seq_numINTEGER (Required) A monotonically increasing number indicating relative recency for each record: the larger the number, the more recent the record.

People schema

The first People export file includes a list of current people at the time of your first sync (deleted or suppressed people are not be included in the first file). Subsequent exports include people who were created, deleted, or suppressed since the last export.

As of v3, people exports come in two different files:

  • people_v3_<env>_<seq>.parquet: Contains new people.
  • people_v3_chngs_<env>_<seq>.parquet: Contains changes to people since the previous sync.

These files have an identical structure and a part of the same data set. You should import them to the same table.

Field NamePrimary KeyForeign KeyDescription
workspace_idINTEGER (Required). The ID of the Customer.io workspace associated with the person.
customer_idSTRING (Required). The ID of the person in question. This will match the ID you see in the Customer.io UI.
internal_customer_idSTRING (Required). The cio_id of the person in question. Use the people parquet file to resolve this ID to an external customer_id or email address.
deletedBOOLEAN (Nullable). This indicates whether the person has been deleted.
suppressedBOOLEAN (Nullable). This indicates whether the person has been suppressed.
created_atTIMESTAMP (Required). The timestamp the profile was created at. Note that this is the timestamp the profile was created in Customer.io's database, which may not match the created_at profile attribute.
updated_atTIMESTAMP (Required) The date-time when a person was updated. Use the most recent updated_at value for a customer_id to disambiguate between multiple records.
email_addrSTRING (Optional) The email address of the person. For workspaces using email as a unique identifier, this value may be the same as the customer_id.

Attributes schema

Attribute exports represent changes to people (by way of their attribute values) over time. The initial Attributes export includes a list of profiles and their current attributes. Subsequent files contain attribute changes, with one change per row.

Field NamePrimary KeyForeign KeyDescription
workspace_idINTEGER (Required). The ID of the Customer.io workspace associated with the person.
internal_customer_idSTRING (Required). The cio_id of the person in question. Use the people parquet file to resolve this ID to an external customer_id or email address.
attribute_nameSTRING (Required). The attribute that was updated.
attribute_valueSTRING (Required). The new value of the attribute.
timestampTIMESTAMP (Required). The timestamp of the attribute update.

Changelog

v3

Version 3 includes the following changes from v2:

  • A seq_num column in the Deliveries, Subjects, Outputs, and Metrics tables. This is a constantly increasing value, where a larger value indicates a more current record.
  • People tables now contain record updates, rather than only the record as first created. For example, we produce a new record if you change a person’s email, delete them, or suppress them.
  • The People table now has an updated_at column. Because many data warehouses don’t replace rows when adding duplicate primary keys, you can select the most recent updated_at value for each profile.
  • The People table now contains an email_addr column.

v2

Data warehouse sync v2 includes the following changes from v1:

  • Support for transactional messages (as transactional_message_id) in the Deliveries schema.
  • A fix for an issue that caused missing rows in Subjects and Outputs data. As a result of this bug, data warehouse v1 no longer supports Subjects or Outputs data.

If you used our initial data warehouse release, we recommend updating to the v2 implementation. However, you can continue using our original data warehouse sync feature if you don’t use:

  • Subjects and/or Outputs data
  • Transactional messages

Frequently asked questions

How are exported parquet files organized?

Each parquet file is named <name>_v<x>_<workspace_id>_<sequence>.

  • <name> is either deliveries, metrics, subjects, outputs, or people.
  • v Indicates the schema version. The current version is v2; v1 schemas do not have a version indicator and are deprecated.
  • <workspace_id> refers to the Customer.io workspace whose data is included.
  • <sequence> is an ever-increasing value over time.

 Initial metrics sync file names

Your initial metrics sync is broken up into files that indicate a starting and ending sequence, to help you order things appropriately, for example: <name>_v<x>_<workspace_id>_<sequence_pt1>_<sequence_pt2>

How do you handle export failures?

If we experience an internal failure, we monitor and repair it promptly. If there’s an external failure such as a networking problem, we retry the export. If there’s an issue with bucket access, we’ll reach out to you with details.

In each retry case, the next export will contain all data since the prior export. No data will be lost.

How should I import data from my bucket to my data warehouse?

There are many approaches to this, but here’s one example we’ve seen work for moving data from a GCS bucket to BigQuery.

  • Implement a Cloud Function to automatically import the parquet files from your GCS bucket to a BigQuery table.
  • Set an expiration on the parquet files so they get automatically deleted.

Below is a screenshot of an example Cloud Function. You can download sample code for this cloud function below here:

main.go
BigQuery Cloud Function
BigQuery Cloud Function

How can I get information about a campaign, workflow action, or message that’s not included in the export?

With the data in our export, it’s possible to query our API to pull any extra information you need.

For example, calling https://api.customer.io/v1/campaigns/:id with the id of the campaign from a subject record will give you details about the campaign.

Check out our full API docs here.

How do I get all of the current attributes for a profile? On setup, the initial Attributes export will include a list of profiles and their current attributes. Subsequent files will only contain attribute changes, with one change per row. In order to get the most recent attributes for a particular profile, you’ll need to use the timestamp value to query for the latest.

An example query for Snowflake would be:

select internal_customer_id, attribute_name, attribute_value 
from (select internal_customer_id, attribute_name, attribute_value, row_number() over (partition by internal_customer_id,attribute_name order by timestamp desc) RNO from attributes) 
where rno=1 and internal_customer_id='xxxxxxxxxxx';
Copied to clipboard!