Scala & Java Transforms
Creating Scala and Java Transforms
Video Walkthrough + Example
We have included a video walkthrough alongside the sample code for a Scala transform.
Developing the JAR
Scala and Java transforms are enabled in Ascend by providing a JAR to Ascend. Inside of this JAR, Ascend expects a class that conforms to the Ascend interface (namely, a transform function that takes input DataFrame(s) and returns an output DataFrame).
In building the JAR locally, you will need a "provided" dependency of Spark. This provided dependency will allow you to successfully import Spark libraries, but not package up Spark itself, since that will be provided by the Spark runtime.
To include dependencies with this JAR, please create what is colloquially known as a "fat jar" or an "uber jar". In sbt, this can be done by the sbt-assembly plugin and then running: sbt assembly
.
Important
Ascend will cache the JAR according to its filename in order to increase performance and thus considers JARs immutable (it will use a cached copy if available, or re-download if need be). We suggest a naming scheme for the filenames uploaded to include versioning and creating a new version with code deploys.
Early Access Feature: With Scala / Java transforms, we have a preview version that uses the file naming to determine whether to process historical data or simply use this new version on a 'go forward' basis. Please contact your account manager to have this feature enabled. The filename is required to end with v<MajorVersionNumber>.<MinorVersionNumber>.jar
. Major version number changes will reprocess historical data, whereas minor version number bumps will only be used for anything that needs processing from that point forward.
Entrypoint Class Interface
Signature | Required | Purpose |
---|---|---|
def transform(spark: SparkSession, inputs: Seq[DataFrame], credentials: String): DataFrame or def transform(spark: SparkSession, inputs: Seq[DataFrame]): DataFrame | Yes | Definition of how to transform the input DataFrames into an output DataFrame. Notice the two variations, one if credentials are configured to be included in the component definition, and one without |
def inferSchema(spark: SparkSession, inputs: Seq[DataFrame]): StructType | No | Schema Inference. Since Ascend needs to know the schema of all transform components, Ascend will by default run the transform function with each input replaced by an empty DataFrame (with matching schema), then inspect the returned DataFrame to determine the component's output schema. By implementing this method to return the output schema, however, Ascend will skip schema inference and use the returned StructType as the output schema. If the transform when run on the data set does not create a DataFrame matching the StructType found here, an error will be raised. |
def inputSchemas(spark: SparkSession, inputs: Seq[DataFrame]): Seq[Option[StructType]] | No | InputSchemas: Assert the schema of the inputs, will raise error if the schemas do not match. (If unimplemented, this assertion will be skipped.) This method can be useful to help validate that this class is being used as expected with the correct inputs (especially if going to convert the DataFrame into a Dataset). |
def requiredMetadataColumns(spark: SparkSession, inputs: Seq[DataFrame]): Map[Int, Set[String]] | No | Required Metadata Columns: To request metadata columns for an input, implement this interface. The returned map associates the index in the inputs list to a set of the requested metadata column names. |
Creating the Component
- Navigate to the "New Transform" page.
Choose a dataflow that already has Read Connector(s), Data Feed(s), or another transform component. Click the 'Build' panel on the left or right click a component to create a new transform. - Name the Transform
Enter a name for your Transform. It's advisable to provide a high-level description of your Transform in the "Description" text box. - Choose Scala as the Transform type
- Specify the Upstream Datasets
To specify upstream datasets that will be used in Scala transforms, Ascend provides an array called INPUT[] which is configured with upstream datasets at separate elements. The first input is available as INPUT[0]. Additional upstreams can be subsequently added as INPUT[1], INPUT[2] and so on, using the dropdown for each element and using the ADD ANOTHER INPUT button to add more. - Specify Partitioning Strategy
This step determines how partitions will be handled. For more details, please refer to the Data Partitioning guide. - Specify the entrypoint class name.
This field requires the fully qualified classname of the class that implements the Ascend interface - Specify the JAR location.
You may upload the built JAR to any of: AWS S3, ADLS Blob Store, or Google Cloud Storage. Optionally, you may also provide credentials that may be necessary to download this JAR. - Specifying Runtime
In order to choose the same version of Scala and Spark that your JAR has been compiled against, Scala / Java transforms require you to select a runtime version that is a combination of Scala version and Spark version. - Click the "Create" button. This will initiate the processing of the Ascend transform created.
Updated almost 4 years ago