Create a Snowpark Pipeline Pt. 2

Create Multiple Dataflows Using Snowpark Transforms

In this guide, we'll be creating a new set of pipelines that combine weather and cab data to produce more advanced analytics on cab rides. As with the previous Snowpark guide create a Snowpark pipeline, it is recommended to work through the create a Taxi Domain and create a Weather Domain guides for a more general introduction (and to set up an S3 connection).

Note that this guide will create 2 dataflows: Cabs (the main, advanced pipeline that will end with our final analysis) and Weather (a brief, straightforward pipeline that ends with a data share). We will build out the first part of the Cabs pipeline, switch over to complete the full Weather pipeline, and then back to Cabs to round out the full pipeline.

Pipeline 1: Cabs Pt. 1

The overall purpose of this dataflow will be to:

  1. Ingest cab data that we'll eventually clean up and combine with our weather data
  2. Produce analysis on tipping behavior

A majority of our work in this pipeline will revolve around cleaning up and massaging the data in Snowflake SQL before running more complicated analytical transforms in Snowpark.

Let's begin!

Step 0: Create a New Dataflow

If you don't currently have a blank dataflow open, navigate to your Ascend environment home page and hit the New Dataflow button above a data service and title it Cabs.

Step 1: Create the Read Connectors

First, we'll pull in some data to use for our dataflow from an S3 bucket.

Create the CLEAN_YELLOW_CABS Read Connector

Select Read Connector from the CREATE menu on the tool panel. Select the Sample Datasets connection made in create a Taxi Domain.

If you don't have that, that's alright! You just need an S3 connection that points to the ascend-io-sample-data-read S3 bucket.

Once you see the connection, select USE. On the next screen, hit the Skip and Configure Manually option.

On the Connector Configuration screen, fill the following fields with their respective values:

FieldValue
NAMECLEAN_YELLOW_CABS
OBJECT PATTERN MATCHINGMatch
OBJECT PATTERNNYC-TAXI/yellow_cab/clean_yellow_cab/data.csv
PARSERCSV
FILES HAVE A HEADER ROWCheck this box.

Once all these fields have been filled, click the Generate Schema button. You should see the following schema (make sure the column names and types match):

Once the preview of the records appears and matches the above image, you can hit the CREATE button on the top right.

Create the CLEAN_GREEN_CABS Read Connector

Repeat the steps above for another connector but using the following values instead:

FieldValue
NAMECLEAN_GREEN_CABS
OBJECT PATTERN MATCHINGPrefix
OBJECT PATTERNNYC-TAXI/green_cab/clean_green_cab/*.csv
PARSERCSV
FILES HAVE A HEADER ROWCheck this box.

You should see the following schema after hitting the Generate Schema button:

With this, you should now have your read connectors for the Cabs dataflow!

Step 2: Create the Transforms

Next, we'll work with the CLEAN_YELLOW_CABS and CLEAN_GREEN_CABS data to create a set of transforms that will make the datasets compatible (note the schemas are slightly different!) and allow us to combine the data.

Create the AUGMENTED_GREEN_CABS Transform

From the CLEAN_GREEN_CABS read connector, left-select the vertical ellipsis and select Create new Transform. When the transform window opens, fill out the following fields with their corresponding values:

FieldValue
NAMEAUGMENTED_GREEN_CABS
HOW WILL YOU SPECIFY THE TRANSFORM?Snowflake SQL

Once these options are filled out, you should now have an inline code editor available. There, the following code can be input:

SELECT
    c.*,
    'green' as cab_type,
    DATE_TRUNC('day', TO_TIMESTAMP(c.PICKUP_DATETIME)) AS pickup_date
FROM
    {{CLEAN_GREEN_CABS}} AS c

All other configurations can be left blank or as is.

Now, select CREATE and the transform will begin running your Snowflake SQL code.

Create the ALL_CABS Transform

From the AUGMENTED_GREEN_CABS transform, left-select the vertical ellipsis and select Create new Transform. As we did above, fill in the following fields and the code editor with their respective pieces of information:

FieldValue
NAMEALL_CABS
HOW WILL YOU SPECIFY THE TRANSFORM?Snowflake SQL
SELECT *
FROM {{AUGMENTED_GREEN_CABS}}

UNION ALL

SELECT *
FROM {{CLEAN_YELLOW_CABS}}

Select CREATE to finish the component.

Pipeline 2: Weather

Now we'll begin with creating the weather dataflow, where we'll be working with some weather data within the same S3 bucket. We'll transform the data within this dataflow then make it available for use in our Cabs dataflow via the use of a data share.

Step 0: Create the Dataflow

You'll need to create a new dataflow for this and can do so by navigating back to the environment's home page (left-click the Ascend.io icon on the top left to get there the quickest). Navigate back to the data service that contains your Cabs dataflow and hit the New Dataflow button above that data service and title it Weather.

Step 1: Create the Read Connector

As before, select Read Connector from the CREATE menu on the tool panel and select the S3 connection that points at the ascend-io-sample-data-read S3 bucket. Select USE and on the next screen, hit the Skip and Configure Manually option.

On the Connector Configuration screen, fill the following fields with their respective values:

FieldValue
NAMEWEATHER
OBJECT PATTERN MATCHINGPrefix
OBJECT PATTERNweather/sensor-by-day/
PARSERCSV
FILES HAVE A HEADER ROWCheck this box.

On hitting Generate Schema, you should get the following schema preview:

Step 2: Create the Transforms

Just as we did with the Cabs dataflow, we'll use some Snowflake SQL transforms to get the data to a state where we can use it for more advanced purposes.

Create the WEATHER_MAYBE_RAINY Transform

From the WEATHER read connector, left-select the vertical ellipsis and select Create new Transform. Fill out the following fields as such:

FieldValue
NAMEWEATHER_MAYBE_RAINY
HOW WILL YOU SPECIFY THE TRANSFORM?Snowflake SQL
SELECT
    w.*,
    CASE 
        WHEN w.PRECIPITATION > .1 THEN 'rainy'
        ELSE 'not rainy'
    END AS weather
FROM
    {{WEATHER}} AS w

Complete the component by selecting the CREATE button.

Create the RAINY_DAYS Transform

Downstream of the WEATHER_MAYBE_RAINY transform, create the following transform:

FieldValue
NAMERAINY_DAYS
HOW WILL YOU SPECIFY THE TRANSFORM?Snowflake SQL
SELECT
    a.*
FROM
    {{WEATHER_MAYBE_RAINY}} AS a
    INNER JOIN (
        SELECT
            MAX(b.PRECIPITATION) AS maxprecip,
            b.WEATHER_DATE_TS
        FROM
            {{WEATHER_MAYBE_RAINY}} AS b
        GROUP BY
            b.WEATHER_DATE_TS
    ) c ON a.PRECIPITATION = c.maxprecip
    AND a.WEATHER_DATE_TS = c.WEATHER_DATE_TS

Select CREATE.

Step 3: Create the Data Share

The data share will allow us to share the output from the RAINY_DAYS transform with other dataflows. In this case, since we need the data to be used in the Cabs dataflow, this works perfectly for us!

So from our RAINY_DAYS transform, left-select the vertical ellipsis and select Create new Data Share. In the data share window that will pop up, fill out the fields with this information:

FieldValue
NAMERAINY_DAYS_DATA_SHARE
UPSTREAMRAINY_DAYS
PERMISSION SETTINGSAll Data Services can access this Data Share

If your data share comes up to date, then you've officially got your Weather dataflow up and running!

Pipeline 3: Cabs Pt. 2

Now we'll wrap up our Cabs dataflow with some final transformations that will use the data we have in our Weather dataflow.

Step 1: Connect to the Data Share

First navigate back to the Cabs dataflow we were working with in the first part of this guide.

We're going to create the subscriber side of the RAINY_DAYS_DATE_SHARE so that we can get the info into our Cabs dataflow. To do this, select Connect to Data Share from the CREATE menu on the tool panel. Once the menu of all the data shares available to this dataflow pops up, select the option named RAINY_DAYS_DATA_SHARE and click the Subscribe button that will show up after selecting it, then hit CREATE.

Step 2: Create the Transforms

Create the RIDES_WITH_WEATHER Transform

This transform will join the data we took from the Weather dataflow with the combined dataset from our ALL_CABS transform.

Left-select the vertical ellipsis of ALL_CABS and select Create new Transform. Fill out the fields as such:

FieldValue
NAMERIDES_WITH_WEATHER
HOW WILL YOU SPECIFY THE TRANSFORM?Snowflake SQL
SELECT
  c.*,
  w.*
FROM
  {{ALL_CABS}} AS c
  INNER JOIN {{RAINY_DAYS_DATA_SHARE}} AS w
  ON c.PICKUP_DATE = w.WEATHER_DATE_TS

Create the TIP_ANALYSIS Transform

This will be the final transform in the dataflow, using Snowpark to perform some more complicated analysis. From your RIDES_WITH_WEATHER date, create a new transform and fill out the following fields:

FieldValue
NAMETIP_ANALYSIS
HOW WILL YOU SPECIFY THE TRANSFORM?Snowpark
WHICH UPSTREAM DATASET WILL BE INPUT[0]?RIDES_WITH_WEATHER
HOW WILL THE PARTITIONS BE HANDLED?Reduce All Partitions to One
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import DataFrame
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
from typing import List

def transform(session: Session, inputs: List[DataFrame]) -> DataFrame:
    table = inputs[0]

    return table \
        .filter(table.tip_amount > 0) \
        .filter(table.fare_amount > 0) \
        .withColumn("tip_percentage", table.tip_amount*100/table.fare_amount) \
        .groupBy("passenger_count") \
        .agg(avg("tip_percentage").alias("avg_tip_percentage"))

Congrats, you have created a fully operational Snowpark dataflow in Ascend!


© Ascension Labs Inc. | All Rights Reserved