How to use Python to write data to a Snowflake database from Spotfire Statistica

How to use Python to write data to a Snowflake database from Spotfire Statistica

book

Article ID: KB0138019

calendar_today

Updated On:

Products Versions
Spotfire Statistica 14.2 and higher

Description

Statistica uses the "Write Spreadsheet to Database" workspace node to write spreadsheets to SQL Server and Oracle databases, as designed.  Both SQL Server and Oracle have Bulk Loading but Snowflake doesn't have this functionality when suing ODBC drivers.  However, using the Python node with specific Python code can be used to write bulk data from Statistica to a Snowflake database.  

 

Important Notes: 

  • Only known to work with Python 3.11 and 3.12.8
  • For any issues or question with the Snowflake database, contact your Snowflake database administrator or Snowflake support.

Environment

OS:  Windows

Resolution

The Python code used has dependent libraries which are necessary to be installed.  Typically these are installed from an administrator command prompt in Windows:

 

pip install snowflake-connector-python
pip install "snowflake-connector-python[pandas]"
pip install pandas

pip install comtypes

pip install --upgrade --force-reinstall cffi 

 

Important Notes: 

  • Other dependencies may need to be installed.  A message will indicate any missing dependencies which can usually be installed using pip.  If not, find the documentation for the specific dependency library online for details on how to install it.
  • For Python versions other than 3.11 and 3.12.8, packages may need to be updated and the code modified.  To use the code below (and in the workspace attached at the bottom of this knowledge base article), use only 3.11 and 3.12.8.  If another Python version is needed, customization would be required.  If customization is required and help is needed, contact your account manager to arrange for a service provider contract. 

 

 

The entire Python code is as follows:

===============================================================================================================

import snowflake.connector
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
import os
import sys
import warnings
import time
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas

# Suppress the cryptography warnings that sometimes appear
warnings.filterwarnings("ignore", category=UserWarning, module="cryptography")

# --- Configuration Section ---
# Replace these with your actual Snowflake connection and file details
SNOWFLAKE_USER = "<snowflake_user_account>"
SNOWFLAKE_ACCOUNT = "<snowflake_account>"
SNOWFLAKE_WAREHOUSE = "<snowflake_warehouse>"
SNOWFLAKE_DATABASE = "<snowflake_database"
SNOWFLAKE_SCHEMA = "<snowflake_schema>"

# Path to your private key file (.p8)
PRIVATE_KEY_PATH = r"C:\Users\<any_users_path>\Documents\rsa_key.p8"

# The passphrase used to encrypt your private key
PRIVATE_KEY_PASSWORD = "<private key password>"  #When creating a certificate using RSA, the password is created

# The local CSV file you want to upload
CSV_FILE = "C:\\Data\\Data01.csv"       #This is where the file is saved or staged

# The target Snowflake table where the data will be loaded
# The 'write_pandas' function can create this table if it doesn't exist.
TABLE_NAME = "14vby8000rows_TestBeforeColumnAdded"   #This is just an example table

# --- Function Definitions ---

def connect_to_snowflake():
    """
    Establishes a connection to Snowflake using key-pair authentication.
    The function reads an encrypted private key and decrypts it using a passphrase.
    """
    try:
        # Check if the private key file exists
        if not os.path.exists(PRIVATE_KEY_PATH):
            raise FileNotFoundError(f"Error: Private key file not found at '{PRIVATE_KEY_PATH}'.")

        # Load the encrypted private key and decrypt it with the passphrase
        with open(PRIVATE_KEY_PATH, "rb") as key_file:
            p8_key = key_file.read()
            private_key = serialization.load_pem_private_key(
                p8_key,
                password=PRIVATE_KEY_PASSWORD.encode(),
                backend=default_backend()
            )

        # Serialize the private key to DER format for the connector
        private_key_bytes = private_key.private_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )

        print("Connecting to Snowflake...")
        conn = snowflake.connector.connect(
            user=SNOWFLAKE_USER,
            account=SNOWFLAKE_ACCOUNT,
            warehouse=SNOWFLAKE_WAREHOUSE,
            database=SNOWFLAKE_DATABASE,
            schema=SNOWFLAKE_SCHEMA,
            private_key=private_key_bytes
        )

        print("Connection successful!")
        return conn

    except FileNotFoundError as e:
        print(e)
        sys.exit(1)
    except Exception as e:
        print(f"An error occurred during connection: {e}")
        sys.exit(1)

def main():
    """
    Main function to connect to Snowflake, read data from a CSV,
    and write it to a table.
    """
    conn = None
    t_start = time.time()
    try:
        # Step 1: Establish the Snowflake connection
        conn = connect_to_snowflake()
        
        # Step 2: Set the current session context to prevent the "no current schema" error
        print(f"Setting session context to '{SNOWFLAKE_DATABASE}.{SNOWFLAKE_SCHEMA}'...")
        conn.cursor().execute(f"USE DATABASE {SNOWFLAKE_DATABASE};")
        conn.cursor().execute(f"USE SCHEMA {SNOWFLAKE_SCHEMA};")

        # Step 3: Read the CSV file into a pandas DataFrame
        print(f"Reading data from '{CSV_FILE}'...")
        if not os.path.exists(CSV_FILE):
            raise FileNotFoundError(f"Error: CSV file not found at '{CSV_FILE}'.")
        
        df = pd.read_csv(CSV_FILE)
        print(f"Successfully read {len(df)} rows.")

        # Step 4: Write the DataFrame to the Snowflake table
        print(f"Writing data to table '{TABLE_NAME}'...")
        success, n_chunks, n_rows, _ = write_pandas(
            conn,
            df,
            TABLE_NAME,
            auto_create_table=True, # Automatically creates the table if it doesn't exist
            overwrite=False # Set to True to overwrite existing table data
        )
        
        # Step 5: Commit the changes
        conn.commit()
        print(f"Data committed. {n_rows} rows were written successfully.")
        
    except Exception as e:
        print(f"An error occurred: {e}")
        if conn:
            print("Rolling back the transaction...")
            conn.rollback()
            print("Transaction rolled back.")
    finally:
        # Step 6: Ensure the connection is closed
        if conn:
            conn.close()
            print("\nConnection closed.")
        
    t_end = time.time()
    print("--- Process completed in %s seconds ---" % (t_end - t_start))

# --- Main Execution Block ---
if __name__ == "__main__":
    main()

===============================================================================================================

Issue/Introduction

How to use Python to write data to a Snowflake database from Spotfire Statistica.

Additional Information

https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-pandas

https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-example

https://quickstarts.snowflake.com/guide/getting_started_with_python/index.html#0

 

 

Attachments

Workspace-Python-WriteSnowflake_usingRSAKeys_AddingDateColumn.sdm get_app