PySpark transforms are used for additional control and flexibility in configuring transformations on Ascend.
Navigate to a Dataflow that already has Read Connector(s) or Data Feed(s). Click + ADD and select Transform to bring up the CREATE NEW TRANSFORM panel.
In the CREATE NEW TRANSFORM panel, enter a name for your Transform under NAME in the OVERVIEW session. It's advisable to provide a high-level description of your Transform in the DESCRIPTION text box.
Choose PySpark in the QUERY section.
To specify upstream datasets that will be used in PySpark transforms, Ascend provides an array called INPUT which is configured with upstream datasets at separate elements.
For example, as shown in the screenshot below, the upstream dataset TotalUsage_By_Room is specified as _INPUT using the dropdown.
Additional upstreams can be subsequently added as INPUT, INPUT and so on, using the dropdown for each element and using the ADD ANOTHER INPUT button to add more.
This step determines how partitions will be handled. For more details, please refer to the section titled Partitioning in the Performance Tuning section of the documentation.
For the transformation logic specified within data transforms, Ascend evaluates the current partitioning strategy, optimizes underlying operations, and generates the compute jobs needed to process data and/or re-partition the data.
3 types of partitioning operations can occur when data flows from one transform to another:
- Map all Partitions One-to-One: Operation where 1 output partition is produced for each input partition.
- Partition by Timestamp: Operation where 1 output partition is produced for each time partition value. Monthly, daily and hourly partitioning are supported.
- Reduce all Partitions to One: Operation where a single output partition is produced from all input partitions.
If you select the Partition by Timestamp option, select the desired column from the upstream dataset to use for partitioning, and granularity as shown below.
The PySpark transformation development process needs to follow mandatory steps below:
- Import desired libraries.
- Develop the inbuilt transform function.
- Return a PySpark dataframe as output.
Let's look at each of them in more detail.
By default when you create an Ascend transform and select PySpark as the query specification type, sample code will automatically be added as boilerplate in the editor (screenshot below.)
As you can see, only a couple of libraries are imported automatically from the typing and pyspark.sql modules.
For a detail list of modules that can be imported for use in the Ascend transformation, please refer to the official Apache PySpark documentation.
For example, if you want to develop PySpark transformation logic that uses the window module and other SQL functions, you may want to add import statements like:
from pyspark.sql.window import Window import pyspark.sql.functions as fn
The inbuilt transform function can be developed in two ways:
- Perform complex SQLs with temporary WITH clauses, subqueries, and other nested SQL approaches.
Although these are unavailable with regular Ascend SQL transforms, you can do this by assigning all the upstream datasets to dataframe variables inside the function, creating temporary views using the createTempView function, and then using the sparkSession.sql() method to develop the query you need (screenshot below.)
- Perform PySpark Dataframe transformations.
You can develop data transformation logic using the transformations available in the PySpark dataframe API (screenshot below.)
In a sequence of PySpark transformations as shown above(filter(), withColumn(), groupBy() etc.), do not use dataframe actions(collect(), count(), first() etc.), unless it is the last step in the sequence of dataframe operations. Dataframe actions interspersed between transformations can cause unpredictable results, and as such are not recommended as best practice by the Apache Spark community.
If you intend to generate an action after a series of transformations and then perform additional transformations based on the results of the earlier action, it is recommended to split this into two Ascend PySpark transformation components right at the place where the action is invoked, instead of doing the entire sequence of operations within a single Ascend PySpark component.
For your reference, a list of valid Spark transformations and actions is maintained at:
Finally, once you've developed the PySpark transformation logic, the last step is to ensure that the return statement in the function returns a PySpark dataframe exclusively. No other data types are supported with the return statement and will error out if attempted. All the sample PySpark transformation screenshots shown in this documentation provide examples of how to return a dataframe.
When all the above steps are completed, click the CREATE button at the bottom of the panel. This will initiate the processing of the Ascend PySpark transform created.
When you need to revise your query by updating any business logic (SQL or PySpark) select the target Transform and then click EDIT on the right upper corner.
Make revisions to your SQL or PySpark code; click CANCEL to undo changes any time or click UPDATE to finalize your change.
Updated 11 months ago