Free Trial Guided Tour: PySpark Transform

Introduction

For this example, we connect a little human interest with meteorology, by identifying very rainy November 24th days in our dataset, and showing interesting historical figures who were born on the same day in the past.

NOTE: We pre-downloaded data about November 24th birthdays from this public Wikipedia API. You could have also written code (ex. a user-defined function or “UDF”) to query this API directly, if you had wanted to! We didn’t want to overload Wikipedia’s servers, so we downloaded a sample of the data for you to play with once, and placed it in an S3 bucket so you can play with it.

What This Demonstrates

Standard functionality of Spark that you can access via Python, such as filters, joins, column aliases, and aggregates

Ascend PySpark transforms are very flexible/powerful and offer you essentially all of the capabilities of Python & Spark

  • This transform references data it is given as an input, but also data that is downloaded, on the fly from another source.
  • In this case, we download the additional reference JSON dataset using S3, create a dataframe out of it on the fly, and join that with the data the transform receives from its upstream.

Functionality for processing JSON data using PySpark (which you can also do in SparkSQL), as well as producing new structured data as an output.

  • Expand the “rainy_days” column to see how a Spark “struct” of timestamps was created, containing the Nov 24th dates from our weather data, which were very rainy!

Instructions

  1. Right-click on “Weather Transform” > Choose “Create new Transform”
  2. Replace the NAME of the Transform with “Nov 24 Rainy Birthdays”
  3. Replace the DESCRIPTION with “__Who had rainy Nov 24 birthdays per Wikipedia, and what days were rainy in our data?__” (minus the quotes). Notice the description includes markdown tags (__ to italicize the text!).
  4. Under Query > HOW WILL YOU SPECIFY THE TRANSFORM?, choose “PySpark”
  5. For “HOW WILL PARTITIONS BE HANDLED?”, choose “Reduce all Partitions to One” and the PySpark code editor will appear.
  6. Copy/paste the code below into the PYSPARK FUNCTION code editor.
  7. Click the “Create” button
  8. Enjoy the result!
import boto3
from botocore import UNSIGNED
from botocore.client import Config
 
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
 
def transform(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> DataFrame:
   df0 = inputs[0]
 
   # load JSON data containing historical birthdays that occurred
   # on 11/24 (original source Wikipedia), from an S3 bucket
   s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
   birthday_blob = s3.get_object(
                       Bucket = 'ascend-io-sample-data-read',
                       Key='an-extended-guided-tour/nov24_birthdays.json'
                   )   
   birthday_data = birthday_blob['Body'].read().decode("utf-8")
 
   # create a spark dataframe from this JSON data
   birthday_data_rdd = spark_session.sparkContext.parallelize([birthday_data])
   birthday_df = spark_session.read.json(birthday_data_rdd, multiLine=True)
 
   # create separate records for each birth by exploding a JSON array
   # and then selecting columns that we'd like to use
   birthday_df = birthday_df.select(F.explode(birthday_df.births).alias('birth_json')) \
       .select(
           F.col('birth_json.text').alias('title'),
           F.col('birth_json.year').alias('birth_year'),
           F.col('birth_json.pages').getItem(0).alias('first_page')
       ) \
       .select(
           'title',
           'birth_year',
           F.col('first_page').displaytitle.alias('name'),
           F.col('first_page').description.alias('description')
       )
 
   # filter and join to make a dataset that we'd like to see
   # we already known that our file only has birthdays for 11/24,
   # so we don't need any special kind of join and can actually use
   # a cross join to generate the desired data set
   result_df = df0.filter( (F.month(df0.weather_date_ts) == 11) & (F.dayofmonth(df0.weather_date_ts) == 24) ) \
                  .filter( df0.Weather.isin(['Rainy', 'Pouring']) ) \
                  .crossJoin( birthday_df ) \
                  .groupBy(
                       birthday_df.title,
                       birthday_df.birth_year,
                       birthday_df.name,
                       birthday_df.description
                   ) \
                  .agg( F.collect_list(df0.weather_date_ts).alias('rainy_days') )
 
   # the final result contains the people who had 11/24 birthdays from Wikipedia,
   # and a column containing a JSON array of which days were rainy
   # that were 11/24 from our weather dataset
   return result_df