A partition is a collection of records. Every Ascend Component partitions its data set, allowing Ascend to optimize:
- Incremental data management by only processing the partitions that are created/updated/deleted
- Robust scaling by processing each partition as a distinct Spark task
Read connectors determine the initial partitioning strategy for the dataset.
Ascend's native blob store connectors default to create 1 partition for each file found; this default can be changed to group many smaller files into a partition through Dynamic Source Aggregation.
For other connectors, such as databases, warehouses, and APIs, the developer designs the partitioning strategy. Every
object returned in
list_objects constitutes a partition of data for Ascend. Examples of common partitioning strategies include:
- Single partition for the full data set; most applicable to data sets smaller than a few GB that do not benefit from incremental processing
- Partition per hour / day / month groupings of the record's
created_at; most applicable to data sets where records are updated for some amount of time after created until they no longer get updated (like a user's orders table)
- Partition by hour / day / month groupings of the record's
updated_at; most applicable to data sets where any arbitrary record gets updated and it's most efficient to pull particularly the updated records (e.g. a
For discussion on designing a partition strategy for your data set, please reach out to your Ascend support engineer.
From the returned list of
objects, Ascend is able to determine if any partitions are added or deleted. For existing partitions, the
fingerprint returned on each
object determines if the partition's data has changed. Only partitions that are newly created/updated/deleted, hereafter referred to as "changed" partitions, are propagated for processing downstream through the Transformations.
Each Transform chooses from 3 strategies to process the data set. The chosen strategy will determine:
- The number of output partitions
- Which input partitions to feed in to compute an output partition
Ascend will run a task for each output partition, providing data from the input partitions. Ascend stores the data for the output partition with a fingerprint that includes the input partition set; this mechanism allows for only reprocessing output partitions that have a changed input partition set.
The below screenshot shows the 3 transform partitioning strategies available in the transform UI:
The below table explains the 3 transform partitioning strategies in details:
Number of Output Partitions
Reduce all Partitions to One - Ascend combines all input partitions processed in 1 Spark task to produce 1 output partition.
Since all partitons in the input component contribute, any change in the input component causes the transform to fully reprocess with all input partitions
Map all Partitions One-to-One - Ascend processes each partition separately, and outputs one output partition with the results of computation for each input partition, "one-to-one".
Same number of output partitions as input partitions
Only changed input partitions are processed through this transform.
Partition by Date/Time Column - Ascend evaluates each input partition to determine which timestamp-based partitions the records belong. Process each timestamp granule as a mapping transform. This strategy automatically handles late data.
One output partition for each timestamp granule. Supports minutes, hours, days, months, and year granularities.
Example: With "day" granularity and a data set spanning 4 days, generate 4 output partitions.
For each changed input partition, inspect the min/max of the timestamp used in partitioning and process any timestamp-based partitions for which the input partition is part of. Processing will always include all records that belong to the timestamp partition.
The below screenshot demonstrates a Dataflow using all 3 transform partitioning strategies: Full Reduction, Mapping, and Timestamp Partition. With 249 input partitions, notice:
- The Full Reduction transform shows the 1 output partition that it has generated.
- The Mapping transform maintains 249 partitions.
- The Timestamp Partition transform, which partitions the data to daily partitions, has 60 partitions, indicating the data set spans 60 days.
Parallelism in Execution
Regardless of which partitioning strategy is used, the Transform will still execute in parallel across many compute resources (a Spark task). Partitioning is an additional level of parallelism above Spark's own internal parallelism and partitioning. For more about Ascend Partitions vs Spark Partitions see this guide.
You choose which partitioning strategy to use when creating or editing the transform. Note, for Timestamp Partitioning, the column to use for selection must already be a
Choosing the Partitioning Strategy in SparkSQL
SparkSQL defaults to a "Reduce all Partitions to One" and this configuration option is available under Advanced Settings.
The partitioning strategy will determine the number of distinct Spark tasks and which records are available in the DataFrame.
- Full Reduction: 1 task. The input DataFrame will contain all records from the upstream component.
- Mapping: 1 task for each input partition. The input DataFrame will contain records associated only to the input partition.
- Timestamp Partition: 1 task for each timestamp granule. The input DataFrame will contain records associated with that granule (regardless of input partitions).
The partitioning strategy chosen above applies to
input, also referred to as the primary input of the component. By default, every other input that is included in the component will be passed in as a full reduction (the entire data set). Given this default, if any partition changes for additional inputs (such as
input), then all partitions of
input will reprocess. This mechanism allows for data integrity through join accuracy, but does mean that the default pipeline is not incremental. If your business logic allows, (for example, a certain partition of
input relates to only certain partitions of
input, Ascend's advanced partitioning handling allows for expressing these input relationships and maintaining pipeline incrementality and data integrity.
For each input component, partitions can be preprocessed.
In particular, the following operations can be executed at a partition level before the data are parsed into spark dataframes by the transform component:
The selected fields (
Created At and/or
Upload At) are added to the records of input partitions
Only partitions whose content satisfy the filter conditions are taken as input. This feature preserves data incrementality across complex pipelines.
Here below are reported few typical use cases:
A user wants to feed forward a certain selection of partitions.
For instance only those that contain records that have been collected after a certain time.
e.g: ts > 2021-02-01
To achieve this in the partition filtering configuration:
With this setup, any partitions that change before the filtered date will not trigger any data processing as Ascend will filter out those partitions at the metadata layer and then determine that component is already "Up to Date".
A user has a component with 2 inputs, both partitioned by month. They want to run the transform block having as input:
- the partitions from 1 month of input_1 and
- the partition of the same month from input_2
To achieve this in the partition filtering configuration:
A user has a component with 2 inputs, both partitioned by month.
He/she wants to run the transform block having as input
the partitions from 1 month of input_1 and
the partition of the same and the pervious month from input_2.
To achieve this in the partition filtering configuration:
Partitions from other components can be added to the input using this field.
This is handy since only partitions that are created/deleted/updated are reprocessed.
In fact, let's assume to have
input_a with 3 partitions and
input_b with 2 partitions.
The number of input partitions will be 5 (3+2).
However if all the partitions of
input_b are refreshed, only 2/5 partitions of the transform block will be re-processed.
Note: Union can be performed only with components that have the same schema
Write Connectors will leverage the partitioning strategy of their upstream Transform, if supported, to determine how to optimize their writing strategy. If a Write Connector does not support the partitioning strategy of its upstream Transform, it will fall back to a Full Reduction. The native blob storage Write Connectors support all partitioning types. Native Data Warehouse write connectors support only Full Reduction and Timestamp Partitioning methods.
With an upstream Full Reduction, the Write Connector will generate 1 task to write out the data. Anytime the upstream partitions change, the full data set will be re-written to destination.
With an upstream Timestamp Partitioned, the Write Connector will generate 1 task per partition to write out. This strategy allows for incremental loading and updating of the end destination. Additionally, this strategy allows for creating folders in which the folder name contains a date value from the data. This pattern is common when preparing data for querying in engines such as Athena, Presto, Redshift Spectrum, and Databricks.
Maintaining Timestamp Partitions through Mapping Transforms
In order to take advantage of timestamp partitioning in a Write Connector (for example, to use a column value as part of a date), the Write Connector does not need to be directly downstream of a Transform that partitions the data by timestamp. Any number of Mapping transforms may follow as long as the partitioning column is carried through and the partition structure (each partition containing values for a particular timestamp granule) is maintained.
The below screenshot demonstrates an example configuration.
In this configuration, event_month is the column name from upstream. The date format string used here follows the definitions from Java SimpleDateFormat.
Below is a screenshot that shows the folders and files created by the write connector in the sample S3 location.
With an upstream Mapping Transform, the Write Connector will maintain the same number of partitions in the destination. Partitions with 0 records are discarded.
Designing your overall Dataflow will require some key considerations to choose the right partitioning strategy.
As mentioned previously, each partition operations will generate a different number of queries / Spark tasks when the upstream has more than 1 partition.
Example: Consider an upstream Read Connector that has 500 partitions totaling 5 million event records from 50 days.
- A Full Reduction will generate only 1 task that processes 5 million records at once.
- Timestamp Partitioning with daily partitions will generate 50 tasks, each processing roughly 100,000 records since the 5 million records contain records from 50 days.
- Mapping will generate 500 queries each processing roughly 10,000 records.
A Full Reduction will likely result in better performance than a Mapping or Timestamp Partitioning when creating a downstream Transform. In this example, the overhead of many small tasks in the processing layer will require more resources and take longer to process than a single task would take. Maintaining a minimum task size will help to avoid this inefficiency.
Ascend is a powerful data platform where Dataflows are built once and run forever. This means that when new data comes into the Read Connector, Ascend will automatically trigger the incremental data loading process and update all components throughout the entire Dataflow.
Both Timestamp Partitioning and Mapping do not require fully reprocessing the entire historical data with incrementally loaded data, whereas a Full Reduction will always need to process the full set of historical data.
Continuing the above example, consider a new file added to the data source; it now totals 501 partitions and 5,010,000 records.
- Full Reduction Transform: Ascend will generate 1 task that processes all of the 5,010,000 records.
- Mapping Transform: Ascend will generate 1 task that processes only 10,000 records.
- Timestamp Partitioning Transform: Ascend will generate 1 task that processes only the new 10,000 records (assumes this new file contains event data from a new day; with late data, the affected partitions would reprocess)
The efficiency gains here may merit structuring the Dataflow to maintain incremental processing through Mapping and Timestamp Partitioning and push out Full Reductions only if the business logic requires it and can possibly be done on now smaller sets of data.
Both Ascend partition and Spark partition can be used to describe a group of records, however, they are distinct concepts and should not be mixed:
- Ascend partition is more about incrementality. This is Ascend specific, a notion to better keep track of the work that needs to be done. For instance, a new S3 "LIST" operation might determine that 2 new partitions (representing 2 files) have shown up, and 1 previous partition (1 file) has changed content.
- Spark partition is more about parallelism, and where data gets store physically during processing. Spark takes an RDD, partitions it, and assigns it to various nodes (executors) for parallel processing. Data with keys in a similar range are more likely to end up on the same partition, thus minimizing shuffling and expensive I/O operations.
coalesce methods are used to change the number of partitions a RDD or Dataframe has. (
coalesce only decreases the count and can help avoid extensive shuffling).
Calling these methods in an Ascend transform does not affect how Ascend partitions behave.
Reusing an earlier example, imagine we have a Read Connector that has 500 partitions totaling 5 million event records. If we create a Pyspark transform after this Read Connector, does a one-to-one partition mapping, and call a
.partition(10) in the Pyspark code, the following will happen:
- The Pyspark transform will have 500 Ascend partitions since it's a one-to-one Ascend partition map
- Each partition's records will be redistributed to 10 Spark partitions for processing
As a best practice, we suggest not explicitly calling
.coalesce(n), and let Spark figure out its appropriate partition count.
Updated 2 months ago