Schema Inference

What Is Schema Inference

When Ascend is running a Transform, the platform first determines the output schema of the Transform without processing any data. By performing this process sequentially throughout the Dataflow, Ascend validates the pipeline quickly without waiting for data to process and the pipeline breaking somewhere in the middle of processing due to a schema issue.

In order for Ascend to determine the output schema, the platforms schedules a task referred to as Schema Inference. This sends in an empty version of the dataset(s) (one for each input) and inspects the output schema of the output dataset.

Most of the time, this process completes transparently without developers noticing. However, in cases where the code itself is dependent on rows always being present (for example, in PySpark calling first() and assuming that row is present) can lead to errors. To fix this, there are two solutions:

  1. Ensure that the code can succeed without errors even if the input DataFrames are empty. In choosing this path, developers can ensure that their pipelines will stay healthy even if there is a partition with 0 records (as Schema Inference creates).
  2. Implement the function to return the expected output schema so Ascend does not need to pass in the empty DataFrames to the transform function.


Schema Inference Output Schema

The output schema from Schema Inference must match the schema returned from when partitions of data are run through. Otherwise, Ascend will produce an error. This step ensures that the different partitions of a component have the same schema, creating a cohesive data set of the component.

Implementing the infer_schema function

To skip the schema inference, the developer can implement the infer_schema function and return the output schema for the function.

The DataFrames passed in to the infer_schema function are all empty DataFrames with their schemas defined correctly.

import pyspark.sql.types as T

# implement the schema inference function
def infer_schema(spark: SparkSession, inputs: List[DataFrame], [credentials: str]) -> StructType:
  return T.StructType([
    T.StructField('output_column_1', T.StringType()),
    T.StructField('output_column_2', T.StringType())

# the transform will now only be called for the data partitions and not as part of Schema Inference
def transform(spark, inputs, credentials=None):
    df = inputs[0]
    return df

Schema Inference and Pivot

Because Ascend determines the output schema of a Transform without any data running through the component (either through inspecting the output DataFrame or the result of infer_schema), this mechanism ensures that changes to the data itself cannot flux the schema of the components / Dataflow.

Pivot, and similar functions in Spark that intentionally change the schema of the DataFrame according to the data are thus not supported in Ascend.

As a general practice, options for developers include:

  1. Create a more 'statically' defined 'pivot'. Instead of truly dynamically having the the columns change on the dataset, perhaps create a list of expected values and add those as columns -- thus creating a consistent output schema regardless of the input dataset.
  2. Pivot further downstream, for example in a BI tool where the data is being consumed. This moves the dynamic nature of the pivot out of the Dataflow where dataset change can break the pipeline and instead into a reporting tool where the dynamic changes of the data will be reflected.

Did this page help you?