Snowpark Transforms

This guide offers a detailed walkthrough on harnessing the power of Ascend.io's transformation features alongside the Snowflake Snowpark Python API. It's designed to familiarize you with incorporating inputs[x] for introducing data and leveraging the conversion of data into Pandas DataFrames for sophisticated transformations and reverse ETL (Extract, Transform, Load) processes.

Prerequisites

Before diving in, make sure you have:

  • A basic grasp of Python and the Pandas library.
  • An understanding of Snowflake and the Snowpark Python API.
  • Active accounts with Ascend.io and Snowflake.

Overview of Snowpark Python API

Snowflake's Snowpark is a developer framework enabling complex data transformations within Snowflake using familiar programming languages, including Python. It seamlessly integrates with Snowflake's robust computing capabilities. For comprehensive details, please visit Snowflake's Developer Guide.

Utilizing inputs[x] in Snowpark Python

Within Ascend.io transforms, inputs[x] facilitates the passage of datasets into Python code for manipulation. Here, inputs is a list, with each element representing a Snowpark DataFrame corresponding to an input dataset.

Example Template

from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import DataFrame
from snowflake.snowpark.functions import F
from snowflake.snowpark.types import T
from typing import List

def transform(session: Session, inputs: List[DataFrame]) -> DataFrame:
    df0 = inputs[0]
    # Data manipulation code here
    return df0

In this template, df0 = inputs[0] assigns the first dataset in the inputs list to df0, which is then processed as a Snowpark DataFrame.

Transforming Data to Pandas DataFrame

To facilitate advanced data manipulation, converting Snowpark DataFrames to Pandas DataFrames can be highly beneficial. This conversion unlocks Pandas' extensive set of functionalities.

Conversion to Pandas DataFrame

pandas_df = df0.to_pandas()

Perform transformations using Pandas

Post transformation within Pandas, revert the data back to a Snowpark DataFrame using:

transformed_df = session.create_dataframe(pandas_df)

Writing Data to a Target (Reverse ETL)

Post-transformation, the data can be written back to Snowflake or another target system, a process often termed as reverse ETL.

Example of Writing Data to Snowflake

transformed_df.write.mode('overwrite').save_as_table('your_target_table')

This snippet either overwrites an existing table or creates a new one in Snowflake with the transformed data.

Aggregation, Joining, and Data Enrichment

This section delves into a practical example that illustrates the power of combining Snowflake's Snowpark Python API with Pandas for data transformation tasks. Through a step-by-step process, we'll aggregate sales data, join it with customer information, and enrich the dataset—all within the context of an Ascend.io platform transformation. This approach showcases how to perform complex operations like aggregation, applying custom functions, joining datasets, and adding new information to your data.

Step 1: Aggregate Sales Data

First, we group sales data by customer ID to calculate total and average sales. This step is crucial for understanding customer behavior and sales performance.

Step 2: Convert to Pandas DataFrame for Complex Operations

Next, we convert the aggregated Snowpark DataFrame to a Pandas DataFrame. This conversion allows us to leverage Pandas' powerful data manipulation functions, such as applying custom categorization based on sales figures.

Step 3: Join with Customer Data

After performing the necessary transformations in Pandas, we join the transformed sales data with customer information. This step enriches the sales data with valuable customer details like names and regions.

Step 4: Data Enrichment - Add a Static Column

Finally, we add a static column to our dataset. This column, named data_source, signifies the origin of the data transformation, providing clarity and traceability in our data pipeline.

from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import DataFrame
from snowflake.snowpark.functions import avg, col, lit
from snowflake.snowpark.types import StringType
from typing import List
import pandas as pd

def transform(session: Session, inputs: List[DataFrame]) -> DataFrame:
    # Assuming inputs[0] is sales data and inputs[1] is customer data
    sales_data = inputs[0]
    customer_data = inputs[1]

    # Aggregate Sales Data
    aggregated_sales = sales_data.groupBy("customer_id").agg(
        sum("sale_amount").alias("total_sales"),
        avg("sale_amount").alias("avg_sales")
    )

    # Convert to Pandas DataFrame for complex operations
    pandas_aggregated_sales = aggregated_sales.to_pandas()

    # Apply a custom function to create a new column for sales categorization
    def categorize_sales(row):
        if row['total_sales'] > 10000:
            return 'High'
        elif row['total_sales'] > 5000:
            return 'Medium'
        else:
            return 'Low'

    pandas_aggregated_sales['sales_category'] = pandas_aggregated_sales.apply(categorize_sales, axis=1)

    # Convert back to Snowpark DataFrame
    spark_aggregated_sales = session.create_dataframe(pandas_aggregated_sales)

    # Join with Customer Data
    enriched_data = spark_aggregated_sales.join(
        customer_data, spark_aggregated_sales.customer_id == customer_data.customer_id
    ).select(
        spark_aggregated_sales[""], customer_data["customer_name"], customer_data["region"]
    )

    # Data Enrichment - Add a static column
    enriched_data = enriched_data.withColumn("data_source", lit("Ascend.io").cast(StringType()))

    return enriched_data

This example underscores the synergy between Snowpark and Pandas within Ascend.io for executing intricate data transformations tailored to specific data and business requirements. Remember, the specific transformations will depend on your actual data and business requirements.


© Ascension Labs Inc. | All Rights Reserved