Skip to main content

Snowflake

Setting up the Snowflake destination connector involves setting up Snowflake entities (warehouse, database, schema, user, and role) in the Snowflake console, setting up the data loading method (internal stage, AWS S3, or Google Cloud Storage bucket), and configuring the Snowflake destination connector using the Airbyte UI.

This page describes the step-by-step process of setting up the Snowflake destination connector.

Prerequisites

  • A Snowflake account with the ACCOUNTADMIN role. If you don’t have an account with the ACCOUNTADMIN role, contact your Snowflake administrator to set one up for you.
  • (Optional) An AWS, or Google Cloud Storage.

Network policies

By default, Snowflake allows users to connect to the service from any computer or device IP address. A security administrator (i.e. users with the SECURITYADMIN role) or higher can create a network policy to allow or deny access to a single IP address or a list of addresses.

If you have any issues connecting with Airbyte Cloud please make sure that the list of IP addresses is on the allowed list

To determine whether a network policy is set on your account or for a specific user, execute the SHOW PARAMETERS command.

Account

    SHOW PARAMETERS LIKE 'network_policy' IN ACCOUNT;

User

    SHOW PARAMETERS LIKE 'network_policy' IN USER <username>;

To read more please check official Snowflake documentation

Setup guide

Step 1: Set up Airbyte-specific entities in Snowflake

To set up the Snowflake destination connector, you first need to create Airbyte-specific Snowflake entities (a warehouse, database, schema, user, and role) with the OWNERSHIP permission to write data into Snowflake, track costs pertaining to Airbyte, and control permissions at a granular level.

You can use the following script in a new Snowflake worksheet to create the entities:

  1. Log into your Snowflake account.

  2. Edit the following script to change the password to a more secure password and to change the names of other resources if you so desire.

    Note: Make sure you follow the Snowflake identifier requirements while renaming the resources.

    -- set variables (these need to be uppercase)
    set airbyte_role = 'AIRBYTE_ROLE';
    set airbyte_username = 'AIRBYTE_USER';
    set airbyte_warehouse = 'AIRBYTE_WAREHOUSE';
    set airbyte_database = 'AIRBYTE_DATABASE';
    set airbyte_schema = 'AIRBYTE_SCHEMA';

    -- set user password
    set airbyte_password = 'password';

    begin;

    -- create Airbyte role
    use role securityadmin;
    create role if not exists identifier($airbyte_role);
    grant role identifier($airbyte_role) to role SYSADMIN;

    -- create Airbyte user
    create user if not exists identifier($airbyte_username)
    password = $airbyte_password
    default_role = $airbyte_role
    default_warehouse = $airbyte_warehouse;

    grant role identifier($airbyte_role) to user identifier($airbyte_username);

    -- change role to sysadmin for warehouse / database steps
    use role sysadmin;

    -- create Airbyte warehouse
    create warehouse if not exists identifier($airbyte_warehouse)
    warehouse_size = xsmall
    warehouse_type = standard
    auto_suspend = 60
    auto_resume = true
    initially_suspended = true;

    -- create Airbyte database
    create database if not exists identifier($airbyte_database);

    -- grant Airbyte warehouse access
    grant USAGE
    on warehouse identifier($airbyte_warehouse)
    to role identifier($airbyte_role);

    -- grant Airbyte database access
    grant OWNERSHIP
    on database identifier($airbyte_database)
    to role identifier($airbyte_role);

    commit;

    begin;

    USE DATABASE identifier($airbyte_database);

    -- create schema for Airbyte data
    CREATE SCHEMA IF NOT EXISTS identifier($airbyte_schema);

    commit;

    begin;

    -- grant Airbyte schema access
    grant OWNERSHIP
    on schema identifier($airbyte_schema)
    to role identifier($airbyte_role);

    commit;
  3. Run the script using the Worksheet page or Snowsight. Make sure to select the All Queries checkbox.

Step 2: Set up a data loading method

By default, Airbyte uses Snowflake’s Internal Stage to load data. You can also load data using an Amazon S3 bucket, or Google Cloud Storage bucket.

Make sure the database and schema have the USAGE privilege.

Using an Amazon S3 bucket

To use an Amazon S3 bucket, create a new Amazon S3 bucket with read/write access for Airbyte to stage data to Snowflake.

Using a Google Cloud Storage bucket

To use a Google Cloud Storage bucket:

  1. Navigate to the Google Cloud Console and create a new bucket with read/write access for Airbyte to stage data to Snowflake.

  2. Generate a JSON key for your service account.

  3. Edit the following script to replace AIRBYTE_ROLE with the role you used for Airbyte's Snowflake configuration and YOURBUCKETNAME with your bucket name.

    create storage INTEGRATION gcs_airbyte_integration
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = GCS
    ENABLED = TRUE
    STORAGE_ALLOWED_LOCATIONS = ('gcs://YOURBUCKETNAME');

    create stage gcs_airbyte_stage
    url = 'gcs://YOURBUCKETNAME'
    storage_integration = gcs_airbyte_integration;

    GRANT USAGE ON integration gcs_airbyte_integration TO ROLE AIRBYTE_ROLE;
    GRANT USAGE ON stage gcs_airbyte_stage TO ROLE AIRBYTE_ROLE;

    DESC STORAGE INTEGRATION gcs_airbyte_integration;

    The final query should show a STORAGE_GCP_SERVICE_ACCOUNT property with an email as the property value. Add read/write permissions to your bucket with that email.

  4. Navigate to the Snowflake UI and run the script as a Snowflake account admin using the Worksheet page or Snowsight.

Step 3: Set up Snowflake as a destination in Airbyte

Navigate to the Airbyte UI to set up Snowflake as a destination. You can authenticate using username/password or OAuth 2.0:

Login and Password

FieldDescription
HostThe host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com). Example: accountname.us-east-2.aws.snowflakecomputing.com
RoleThe role you created in Step 1 for Airbyte to access Snowflake. Example: AIRBYTE_ROLE
WarehouseThe warehouse you created in Step 1 for Airbyte to sync data into. Example: AIRBYTE_WAREHOUSE
DatabaseThe database you created in Step 1 for Airbyte to sync data into. Example: AIRBYTE_DATABASE
SchemaThe default schema used as the target schema for all statements issued from the connection that do not explicitly specify a schema name.
UsernameThe username you created in Step 1 to allow Airbyte to access the database. Example: AIRBYTE_USER
PasswordThe password associated with the username.
JDBC URL Params (Optional)Additional properties to pass to the JDBC URL string when connecting to the database formatted as key=value pairs separated by the symbol &. Example: key1=value1&key2=value2&key3=value3
Disable Final Tables (Optional)Disables writing final Typed tables See output schema. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions

OAuth 2.0

FieldDescription
HostThe host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com). Example: accountname.us-east-2.aws.snowflakecomputing.com
RoleThe role you created in Step 1 for Airbyte to access Snowflake. Example: AIRBYTE_ROLE
WarehouseThe warehouse you created in Step 1 for Airbyte to sync data into. Example: AIRBYTE_WAREHOUSE
DatabaseThe database you created in Step 1 for Airbyte to sync data into. Example: AIRBYTE_DATABASE
SchemaThe default schema used as the target schema for all statements issued from the connection that do not explicitly specify a schema name.
UsernameThe username you created in Step 1 to allow Airbyte to access the database. Example: AIRBYTE_USER
OAuth2The Login name and password to obtain auth token.
JDBC URL Params (Optional)Additional properties to pass to the JDBC URL string when connecting to the database formatted as key=value pairs separated by the symbol &. Example: key1=value1&key2=value2&key3=value3

Key pair authentication

In order to configure key pair authentication you will need a private/public key pair.
If you do not have the key pair yet, you can generate one using openssl command line tool
Use this command in order to generate an unencrypted private key file:

`openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt`

Alternatively, use this command to generate an encrypted private key file:

`openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -v1 PBE-SHA1-RC4-128 -out rsa_key.p8`

Once you have your private key, you need to generate a matching public key.
You can do so with the following command:

`openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub`

Finally, you need to add the public key to your Snowflake user account.
You can do so with the following SQL command in Snowflake:

`alter user <user_name> set rsa_public_key=<public_key_value>;`

and replace <user_name> with your user name and <public_key_value> with your public key.

To use AWS S3 as the cloud storage, enter the information for the S3 bucket you created in Step 2:

FieldDescription
S3 Bucket NameThe name of the staging S3 bucket (Example: airbyte.staging). Airbyte will write files to this bucket and read them via statements on Snowflake.
S3 Bucket RegionThe S3 staging bucket region used.
S3 Key Id *The Access Key ID granting access to the S3 staging bucket. Airbyte requires Read and Write permissions for the bucket.
S3 Access Key *The corresponding secret to the S3 Key ID.
Stream Part Size (Optional)Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 5MB by default, resulting in a default limit of 100GB tables.
Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care. (e.g. 5)
Purge Staging Files and TablesDetermines whether to delete the staging files from S3 after completing the sync. Specifically, the connector will create CSV files named bucketPath/namespace/streamName/syncDate_epochMillis_randomUuid.csv containing three columns (ab_id, data, emitted_at). Normally these files are deleted after sync; if you want to keep them for other purposes, set purge_staging_data to false.
EncryptionWhether files on S3 are encrypted. You probably don't need to enable this, but it can provide an additional layer of security if you are sharing your data storage with other applications. If you do use encryption, you must choose between ephemeral keys (Airbyte will automatically generate a new key for each sync, and nobody but Airbyte and Snowflake will be able to read the data on S3) or providing your own key (if you have the "Purge staging files and tables" option disabled, and you want to be able to decrypt the data yourself)
S3 Filename pattern (Optional)The pattern allows you to set the file-name format for the S3 staging file(s), next placeholders combinations are currently supported: {date}, {date:yyyy_MM}, {timestamp}, {timestamp:millis}, {timestamp:micros}, {part_number}, {sync_id}, {format_extension}. Please, don't use empty space and not supportable placeholders, as they won't recognized.

To use a Google Cloud Storage bucket, enter the information for the bucket you created in Step 2:

FieldDescription
GCP Project IDThe name of the GCP project ID for your credentials. (Example: my-project)
GCP Bucket NameThe name of the staging bucket. Airbyte will write files to this bucket and read them via statements on Snowflake. (Example: airbyte-staging)
Google Application CredentialsThe contents of the JSON key file that has read/write permissions to the staging GCS bucket. You will separately need to grant bucket access to your Snowflake GCP service account. See the Google Cloud docs for more information on how to generate a JSON key for your service account.

Output schema

Airbyte outputs each stream into its own raw table in airbyte_internal schema by default (can be overriden by user) and a final table with Typed columns. Contents in raw table are NOT deduplicated.

Raw Table schema

Airbyte fieldDescriptionColumn type
_airbyte_raw_idA UUID assigned to each processed eventVARCHAR
_airbyte_extracted_atA timestamp for when the event was pulled from the data sourceTIMESTAMP WITH TIME ZONE
_airbyte_loaded_atTimestamp to indicate when the record was loaded into Typed tablesTIMESTAMP WITH TIME ZONE
_airbyte_dataA JSON blob with the event data.VARIANT

Note: Although the contents of the _airbyte_data are fairly stable, schema of the raw table could be subject to change in future versions.

Note: By default, Airbyte creates permanent tables. If you prefer transient tables, create a dedicated transient database for Airbyte. For more information, refer to Working with Temporary and Transient Tables

Supported sync modes

The Snowflake destination supports the following sync modes:

Snowflake tutorials

Now that you have set up the Snowflake destination connector, check out the following Snowflake tutorials:

Troubleshooting

'Current role does not have permissions on the target schema'

If you receive an error stating Current role does not have permissions on the target schema make sure that the Snowflake destination SCHEMA is one that the role you've provided has permissions on. When creating a connection, it may allow you to select Mirror source structure for the Destination namespace, which if you have followed some of our default examples and tutorials may result in the connection trying to write to a PUBLIC schema.

A quick fix could be to edit your connection's 'Replication' settings from Mirror source structure to Destination Default. Otherwise, make sure to grant the role the required permissions in the desired namespace.

Changelog

VersionDatePull RequestSubject
3.4.32023-10-3031960Adopt java CDK version 0.2.0.
3.4.22023-10-27#31897Further filtering on extracted_at
3.4.12023-10-2731683Performance enhancement (switch to a merge statement for incremental-dedup syncs)
3.4.02023-10-2531686Opt out flag for typed and deduped tables
3.3.02023-10-25#31520Stop deduping raw table
3.2.32023-10-17#31191Improve typing+deduping performance by filtering new raw records on extracted_at
3.2.22023-10-10#31194Deallocate unused per stream buffer memory when empty
3.2.12023-10-10#31083Fix precision of numeric values in async destinations
3.2.02023-10-09#31149No longer fail syncs when PKs are null - try do dedupe anyway
3.1.222023-10-06#31153Increase jvm GC retries
3.1.212023-10-06#31139Bump CDK version
3.1.202023-10-06#31129Reduce async buffer size
3.1.192023-10-04#31082Revert null PK checks
3.1.182023-10-01#30779Final table PK columns become non-null and skip check for null PKs in raw records (performance)
3.1.172023-09-29#30938Upgrade snowflake-jdbc driver
3.1.162023-09-28#30835Fix regression from 3.1.15 in supporting concurrent syncs with identical stream name but different namespace
3.1.152023-09-26#30775Increase async block size
3.1.142023-09-27#30739Fix column name collision detection
3.1.132023-09-19#30599Support concurrent syncs with identical stream name but different namespace
3.1.122023-09-21#30671Reduce async buffer size
3.1.112023-09-19#30592Internal code changes
3.1.102023-09-18#30546Make sure that the async buffer are flush every 5 minutes
3.1.92023-09-19#30319Support column names that are reserved
3.1.82023-09-18#30479Fix async memory management
3.1.72023-09-15#30491Improve error message display
3.1.62023-09-14#30439Fix a transient error
3.1.52023-09-13#30416Support ${ in stream name/namespace, and in column names
3.1.42023-09-12#30364Add log message
3.1.32023-08-29#29878Reenable incremental typing and deduping
3.1.22023-08-31#30020Run typing and deduping tasks in parallel
3.1.12023-09-05#30117Type and Dedupe at sync start and then every 6 hours
3.1.02023-09-01#30056Upcase final table names to allow case-insensitive references
3.0.22023-09-01#30121Improve performance on very wide streams by skipping TRY_CAST on strings
3.0.12023-08-27#30065Clearer error thrown when records are missing a primary key
3.0.02023-08-27#29783Destinations V2
2.1.72023-08-29#29949Destinations V2: Fix checking for empty table by ensuring upper-case DB names
2.1.62023-08-28#29878Destinations V2: Fix detection of existing table by ensuring upper-case DB names
2.1.52023-08-28#29903Destinations V2: Performance Improvement, Changing Metadata error array construction from ARRAY_CAT to ARRAY_CONSTRUCT_COMPACT
2.1.42023-08-28#29903Abort queries on crash
2.1.32023-08-25#29881Destinations v2: Only run T+D once at end of sync, to prevent data loss under async conditions
2.1.22023-08-24#29805Destinations v2: Don't soft reset in migration
2.1.12023-08-23#29774Destinations v2: Don't soft reset overwrite syncs
2.1.02023-08-21#29636Destinations v2: Several Critical Bug Fixes (cursorless dedup, improved floating-point handling, improved special characters handling; improved error handling)
2.0.02023-08-09#28894Remove support for Snowflake GCS/S3 loading method in favor of Snowflake Internal staging
1.3.32023-08-15#29461Changing a static constant reference
1.3.22023-08-11#29381Destinations v2: Add support for streams with no columns
1.3.12023-08-04#28894Destinations v2: Update SqlGenerator
1.3.02023-08-07#29174Destinations v2: early access release
1.2.102023-08-07#29188Internal code refactoring
1.2.92023-08-04#28677Destinations v2: internal code changes to prepare for early access release
1.2.82023-08-03#29047Avoid logging record if the format is invalid
1.2.72023-08-02#28976Fix composite PK handling in v1 mode
1.2.62023-08-01#28618Reduce logging noise
1.2.52023-07-24#28618Add hooks in preparation for destinations v2 implementation
1.2.42023-07-21#28584Install dependencies in preparation for destinations v2 work
1.2.32023-07-21#28345Pull in async framework minor bug fix for race condition on state emission
1.2.22023-07-14#28345Increment patch to trigger a rebuild
1.2.12023-07-14#28315Pull in async framework minor bug fix to avoid Snowflake hanging on close
1.2.02023-07-5#27935Enable Faster Snowflake Syncs with Asynchronous writes
1.1.02023-06-27#27781License Update: Elv2
1.0.62023-06-21#27555Reduce image size
1.0.52023-05-31#25782Internal scaffolding for future development
1.0.42023-05-19#26323Prevent infinite retry loop under specific circumstances
1.0.32023-05-15#26081Reverts splits bases
1.0.22023-05-05#25649Splits bases (reverted)
1.0.12023-04-29#25570Internal library update
1.0.02023-05-02#25739Removed Azure Blob Storage as a loading method
0.4.632023-04-27#25346Added FlushBufferFunction interface
0.4.612023-03-30#24736Improve behavior when throttled by AWS API
0.4.602023-03-30#24698Add option in spec to allow increasing the stream buffer size to 50
0.4.592023-03-23#23904Fail faster in certain error cases
0.4.582023-03-27#24615Fixed host validation by pattern on UI
0.4.56 (broken)2023-03-22#23904Added host validation by pattern on UI
0.4.542023-03-17#23788S3-Parquet: added handler to process null values in arrays
0.4.532023-03-15#24058added write attempt to internal staging Check method
0.4.522023-03-10#23931Added support for periodic buffer flush
0.4.512023-03-10#23466Changed S3 Avro type from Int to Long
0.4.492023-02-27#23360Added logging for flushing and writing data to destination storage
0.4.482023-02-23#22877Add handler for IP not in whitelist error and more handlers for insufficient permission error
0.4.472023-01-30#21912Catch "Create" Table and Stage Known Permissions and rethrow as ConfigExceptions
0.4.462023-01-26#20631Added support for destination checkpointing with staging
0.4.452023-01-25#21087Catch Known Permissions and rethrow as ConfigExceptions
0.4.442023-01-20#21087Wrap Authentication Errors as Config Exceptions
0.4.432023-01-20#21450Updated Check methods to handle more possible s3 and gcs stagings issues
0.4.422023-01-12#21342Better handling for conflicting destination streams
0.4.412022-12-16#20566Improve spec to adhere to standards
0.4.402022-11-11#19302Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud
0.4.392022-11-09#18970Updated "check" connection method to handle more errors
0.4.382022-09-26#17115Added connection string identifier
0.4.372022-09-21#16839Update JDBC driver for Snowflake to 3.13.19
0.4.362022-09-14#15668Wrap logs in AirbyteLogMessage
0.4.352022-09-01#16243Fix Json to Avro conversion when there is field name clash from combined restrictions (anyOf, oneOf, allOf fields).
0.4.342022-07-23#14388Add support for key pair authentication
0.4.332022-07-15#14494Make S3 output filename configurable.
0.4.322022-07-14#14618Removed additionalProperties: false from JDBC destination connectors
0.4.312022-07-07#13729Improve configuration field description
0.4.302022-06-24#14114Remove "additionalProperties": false from specs for connectors with staging
0.4.292022-06-17#13753Deprecate and remove PART_SIZE_MB fields from connectors based on StreamTransferManager
0.4.282022-05-18#12952Apply buffering strategy on GCS staging
0.4.272022-05-17#12820Improved 'check' operation performance
0.4.262022-05-12#12805Updated to latest base-java to emit AirbyteTraceMessages on error.
0.4.252022-05-03#12452Add support for encrypted staging on S3; fix the purge_staging_files option
0.4.242022-03-24#11093Added OAuth support (Compatible with Airbyte Version 0.35.60+)
0.4.222022-03-18#10793Fix namespace with invalid characters
0.4.212022-03-18#11071Switch to compressed on-disk buffering before staging to s3/internal stage
0.4.202022-03-14#10341Add Azure blob staging support
0.4.192022-03-11#10699Added unit tests
0.4.172022-02-25#10421Refactor JDBC parameters handling
0.4.162022-02-25#10627Add try catch to make sure all handlers are closed
0.4.152022-02-22#10459Add FailureTrackingAirbyteMessageConsumer
0.4.142022-02-17#10394Reduce memory footprint.
0.4.132022-02-16#10212Execute COPY command in parallel for S3 and GCS staging
0.4.122022-02-15#10342Use connection pool, and fix connection leak.
0.4.112022-02-14#9920Updated the size of staging files for S3 staging. Also, added closure of S3 writers to staging files when data has been written to an staging file.
0.4.102022-02-14#10297Halve the record buffer size to reduce memory consumption.
0.4.92022-02-14#10256Add ExitOnOutOfMemoryError JVM flag.
0.4.82022-02-01#9959Fix null pointer exception from buffered stream consumer.
0.4.72022-01-29#9745Integrate with Sentry.
0.4.62022-01-28#9623Add jdbc_url_params support for optional JDBC parameters
0.4.52021-12-29#9184Update connector fields title/description
0.4.42022-01-24#9743Fixed bug with dashes in schema name
0.4.32022-01-20#9531Start using new S3StreamCopier and expose the purgeStagingData option
0.4.22022-01-10#9141Fixed duplicate rows on retries
0.4.12021-01-06#9311Update сreating schema during check
0.4.02021-12-27#9063Updated normalization to produce permanent tables
0.3.242021-12-23#8869Changed staging approach to Byte-Buffered
0.3.232021-12-22#9039Added part_size configuration in UI for S3 loading method
0.3.222021-12-21#9006Updated jdbc schema naming to follow Snowflake Naming Conventions
0.3.212021-12-15#8781Updated check method to verify permissions to create/drop stage for internal staging; compatibility fix for Java 17
0.3.202021-12-10#8562Moving classes around for better dependency management; compatibility fix for Java 17
0.3.192021-12-06#8528Set Internal Staging as default choice
0.3.182021-11-26#8253Snowflake Internal Staging Support
0.3.172021-11-08#7719Improve handling of wide rows by buffering records based on their byte size rather than their count
0.3.152021-10-11#6949Each stream was split into files of 10,000 records each for copying using S3 or GCS
0.3.142021-09-08#5924Fixed AWS S3 Staging COPY is writing records from different table in the same raw table
0.3.132021-09-01#5784Updated query timeout from 30 minutes to 3 hours
0.3.122021-07-30#5125Enable additionalPropertities in spec.json
0.3.112021-07-21#3555Partial Success in BufferedStreamConsumer
0.3.102021-07-12#4713Tag traffic with airbyte label to enable optimization opportunities from Snowflake