Snowpark Transforms
This guide offers a detailed walkthrough on harnessing the power of Ascend.io's transformation features alongside the Snowflake Snowpark Python API. It's designed to familiarize you with incorporating inputs[x]
for introducing data and leveraging the conversion of data into Pandas DataFrames for sophisticated transformations and reverse ETL (Extract, Transform, Load) processes.
Prerequisites
Before diving in, make sure you have:
- A basic grasp of Python and the Pandas library.
- An understanding of Snowflake and the Snowpark Python API.
- Active accounts with Ascend.io and Snowflake.
Overview of Snowpark Python API
Snowflake's Snowpark is a developer framework enabling complex data transformations within Snowflake using familiar programming languages, including Python. It seamlessly integrates with Snowflake's robust computing capabilities. For comprehensive details, please visit Snowflake's Developer Guide.
Utilizing inputs[x]
in Snowpark Python
inputs[x]
in Snowpark PythonWithin Ascend.io transforms, inputs[x]
facilitates the passage of datasets into Python code for manipulation. Here, inputs
is a list, with each element representing a Snowpark DataFrame corresponding to an input dataset.
Example Template
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import DataFrame
from snowflake.snowpark.functions import F
from snowflake.snowpark.types import T
from typing import List
def transform(session: Session, inputs: List[DataFrame]) -> DataFrame:
df0 = inputs[0]
# Data manipulation code here
return df0
In this template, df0 = inputs[0]
assigns the first dataset in the inputs
list to df0
, which is then processed as a Snowpark DataFrame.
Transforming Data to Pandas DataFrame
To facilitate advanced data manipulation, converting Snowpark DataFrames to Pandas DataFrames can be highly beneficial. This conversion unlocks Pandas' extensive set of functionalities.
Conversion to Pandas DataFrame
pandas_df = df0.to_pandas()
Perform transformations using Pandas
Post transformation within Pandas, revert the data back to a Snowpark DataFrame using:
transformed_df = session.create_dataframe(pandas_df)
Writing Data to a Target (Reverse ETL)
Post-transformation, the data can be written back to Snowflake or another target system, a process often termed as reverse ETL.
Example of Writing Data to Snowflake
transformed_df.write.mode('overwrite').save_as_table('your_target_table')
This snippet either overwrites an existing table or creates a new one in Snowflake with the transformed data.
Aggregation, Joining, and Data Enrichment
This section delves into a practical example that illustrates the power of combining Snowflake's Snowpark Python API with Pandas for data transformation tasks. Through a step-by-step process, we'll aggregate sales data, join it with customer information, and enrich the dataset—all within the context of an Ascend.io platform transformation. This approach showcases how to perform complex operations like aggregation, applying custom functions, joining datasets, and adding new information to your data.
Step 1: Aggregate Sales Data
First, we group sales data by customer ID to calculate total and average sales. This step is crucial for understanding customer behavior and sales performance.
Step 2: Convert to Pandas DataFrame for Complex Operations
Next, we convert the aggregated Snowpark DataFrame to a Pandas DataFrame. This conversion allows us to leverage Pandas' powerful data manipulation functions, such as applying custom categorization based on sales figures.
Step 3: Join with Customer Data
After performing the necessary transformations in Pandas, we join the transformed sales data with customer information. This step enriches the sales data with valuable customer details like names and regions.
Step 4: Data Enrichment - Add a Static Column
Finally, we add a static column to our dataset. This column, named data_source
, signifies the origin of the data transformation, providing clarity and traceability in our data pipeline.
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import DataFrame
from snowflake.snowpark.functions import avg, col, lit
from snowflake.snowpark.types import StringType
from typing import List
import pandas as pd
def transform(session: Session, inputs: List[DataFrame]) -> DataFrame:
# Assuming inputs[0] is sales data and inputs[1] is customer data
sales_data = inputs[0]
customer_data = inputs[1]
# Aggregate Sales Data
aggregated_sales = sales_data.groupBy("customer_id").agg(
sum("sale_amount").alias("total_sales"),
avg("sale_amount").alias("avg_sales")
)
# Convert to Pandas DataFrame for complex operations
pandas_aggregated_sales = aggregated_sales.to_pandas()
# Apply a custom function to create a new column for sales categorization
def categorize_sales(row):
if row['total_sales'] > 10000:
return 'High'
elif row['total_sales'] > 5000:
return 'Medium'
else:
return 'Low'
pandas_aggregated_sales['sales_category'] = pandas_aggregated_sales.apply(categorize_sales, axis=1)
# Convert back to Snowpark DataFrame
spark_aggregated_sales = session.create_dataframe(pandas_aggregated_sales)
# Join with Customer Data
enriched_data = spark_aggregated_sales.join(
customer_data, spark_aggregated_sales.customer_id == customer_data.customer_id
).select(
spark_aggregated_sales[""], customer_data["customer_name"], customer_data["region"]
)
# Data Enrichment - Add a static column
enriched_data = enriched_data.withColumn("data_source", lit("Ascend.io").cast(StringType()))
return enriched_data
This example underscores the synergy between Snowpark and Pandas within Ascend.io for executing intricate data transformations tailored to specific data and business requirements. Remember, the specific transformations will depend on your actual data and business requirements.
Updated 10 months ago