Spark DataFrame Properties

When creating a Python Read Connector, you must choose a connector code interface. With the Spark DataFrame interface, Ascend reads in Spark DataFrames.

Prerequisites

  • A Custom Python Connection

Required Python Functions

The following table describes the functions available when creating a new Python Read Connector utilizing Spark DataFrame interface. Create a Python Read Connector using the information below and these step-by-step instructions.

FunctionDescription
contextGives Ascend the ability to generate context before running list_objects. This is where you can store credentials to access objects
list_objectsdefines what objects you want read. Each object is read in as a separate partition unless objects are defined by metadata. You can specify multiple reads and partition objects into a hierarchy by using metadata and is_prefix.
metadataAssigns metadata to objects for Ascend to parse. metadata includes name, is_prefix, and fingerprint.
nameThis is the object or file name an will be converted to the partition name.
is_prefixTells Ascend whether or not to group objects together by a prefix. If set to false, Ascend will place an object in it's own partition. If set to true, a prefix must be defined and all objects with that prefix will be placed in the same partition.
fingerprintThe fingerprint is the SHA of the object which allows Ascend to determine if the object has changed. A common practice is the assign a time/date stamp as the fingerprint. However, the fingerprint must be a string. The fingerprint must be unique for each yield/partition.
yieldTells Ascend when to create a new partition. To create many partitions, you need multiple yield statements or an iterative cycle of yields.
read_functionTells Ascend to return the objects as one of the available interfaces.

🚧

Out of Memory Exception

Because Spark DataFrames are loaded into processing memory, large amounts of data can result in an out of memory exception.

Recursive list_objects

Metadata is a Python dictionary that defines a partition. Metadata is used in both list_objects and read_bytes. To trigger the recursive behavior within in list_objects and create partitions, set is_prefix to True. If a previously created partition is not recalled when generating list_objects, all previous partition metadata will be deleted.

🚧

When constructing your Python code, list_objects must return the partition metadata for all the partitions you expect to be in the component.

Example Spark DataFrame Code

The following code example describes reading a spreadsheet for Google Sheets.

# This example reads a spreadsheet from Google Sheet to explain the functions to implement

from pyspark.sql import SparkSession, DataFrame
from typing import Dict, Any, Iterator


def context(credentials) -> Dict[str, Any]:
   """
   	 Sets up the context for reading and listing data from data source. 
     This is where the Python Connection information will be passed through. 
     Avoid opening a database connection. 
   	 """
  service_account_info = json.loads(credentials)

  creds = service_account.Credentials.from_service_account_info(service_account_info, scopes=SCOPES)
  g_sheet = build('sheets', 'v4', credentials=creds)
  drive = build('drive', 'v3', credentials=creds)

  return {
      'g_sheet_client': g_sheet,
      'drive_client': drive,
  }


def list_objects(context: Dict[str, Any], metadata) -> Iterator[Dict[str, Any]]:


  yield {'name': 'example_id', 'fingerprint': 'fingerprint', 'is_prefix': False}


def read_spark_dataframe(spark: SparkSession, context: Dict[str, Any], metadata) -> DataFrame:
  """
    # Returns a Spark DataFrame.
    """
  columns = ["language", "users_count"]
  data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
  # Create the pandas DataFrame
  return spark.createDataFrame(data, schema=columns)

👍

External Keys and Secrets Management for Credentials

To use external keys or a secrets manager, set the def context(credentials: str) where str is the name you assigned the credential. Any connector code interface will receive external key as a result of that function call.

Parsers

Ascend natively parses Spark DataFrames and does not require additional parser configurations.

Advanced Settings for Spark Clusters

🚧

For Spark DataFrames, you consider manually adjusting driver size, executors, and executor size. Custom Spark Params for stage: LIST applies parameters to list_objects. Custom Spark Params for stage: READ applies parameters to read_spark_dataframes.

Code Fingerprint Strategy

The Code Fingerprint Strategy controls Ascend's DataAware function. Ascend read's the code and the configuration, and calculates the hash of the component. If the code changes in any way, the data must be reprocessed. Currently, the only available setting is Automatic content-based fingerprint.

Schema Generation

The Schema Generation reads the first few rows of the dataset and generates a schema just as with other Ascend Read Connectors. However, in order to sample the dataset, schema generation will run the code inserted into the Connector Code Interface across the entire dataset.