Free Trial Guided Tour: PySpark Transform
For this example, we connect a little human interest with meteorology, by identifying very rainy November 24th days in our dataset, and showing interesting historical figures who were born on the same day in the past.
NOTE: We pre-downloaded data about November 24th birthdays from this public Wikipedia API. You could have also written code (ex. a user-defined function or “UDF”) to query this API directly, if you had wanted to! We didn’t want to overload Wikipedia’s servers, so we downloaded a sample of the data for you to play with once, and placed it in an S3 bucket so you can play with it.
What This Demonstrates
Standard functionality of Spark that you can access via Python, such as filters, joins, column aliases, and aggregates
Ascend PySpark transforms are very flexible/powerful and offer you essentially all of the capabilities of Python & Spark
- This transform references data it is given as an input, but also data that is downloaded, on the fly from another source.
- In this case, we download the additional reference JSON dataset using S3, create a dataframe out of it on the fly, and join that with the data the transform receives from its upstream.
Functionality for processing JSON data using PySpark (which you can also do in SparkSQL), as well as producing new structured data as an output.
- Expand the “rainy_days” column to see how a Spark “struct” of timestamps was created, containing the Nov 24th dates from our weather data, which were very rainy!
- Right-click on “Weather Transform” > Choose “Create new Transform”
- Replace the NAME of the Transform with “Nov 24 Rainy Birthdays”
- Replace the DESCRIPTION with “
__Who had rainy Nov 24 birthdays per Wikipedia, and what days were rainy in our data?__” (minus the quotes). Notice the description includes markdown tags (__ to italicize the text!).
- Under Query > HOW WILL YOU SPECIFY THE TRANSFORM?, choose “PySpark”
- For “HOW WILL PARTITIONS BE HANDLED?”, choose “Reduce all Partitions to One” and the PySpark code editor will appear.
- Copy/paste the code below into the PYSPARK FUNCTION code editor.
- Click the “Create” button
- Enjoy the result!
import boto3 from botocore import UNSIGNED from botocore.client import Config from pyspark.sql import DataFrame, SparkSession from typing import List import pyspark.sql.types as T import pyspark.sql.functions as F def transform(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> DataFrame: df0 = inputs # load JSON data containing historical birthdays that occurred # on 11/24 (original source Wikipedia), from an S3 bucket s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) birthday_blob = s3.get_object( Bucket = 'ascend-io-sample-data-read', Key='an-extended-guided-tour/nov24_birthdays.json' ) birthday_data = birthday_blob['Body'].read().decode("utf-8") # create a spark dataframe from this JSON data birthday_data_rdd = spark_session.sparkContext.parallelize([birthday_data]) birthday_df = spark_session.read.json(birthday_data_rdd, multiLine=True) # create separate records for each birth by exploding a JSON array # and then selecting columns that we'd like to use birthday_df = birthday_df.select(F.explode(birthday_df.births).alias('birth_json')) \ .select( F.col('birth_json.text').alias('title'), F.col('birth_json.year').alias('birth_year'), F.col('birth_json.pages').getItem(0).alias('first_page') ) \ .select( 'title', 'birth_year', F.col('first_page').displaytitle.alias('name'), F.col('first_page').description.alias('description') ) # filter and join to make a dataset that we'd like to see # we already known that our file only has birthdays for 11/24, # so we don't need any special kind of join and can actually use # a cross join to generate the desired data set result_df = df0.filter( (F.month(df0.weather_date_ts) == 11) & (F.dayofmonth(df0.weather_date_ts) == 24) ) \ .filter( df0.Weather.isin(['Rainy', 'Pouring']) ) \ .crossJoin( birthday_df ) \ .groupBy( birthday_df.title, birthday_df.birth_year, birthday_df.name, birthday_df.description ) \ .agg( F.collect_list(df0.weather_date_ts).alias('rainy_days') ) # the final result contains the people who had 11/24 birthdays from Wikipedia, # and a column containing a JSON array of which days were rainy # that were 11/24 from our weather dataset return result_df
Updated 24 days ago