Spark DataFrame Properties

When creating a Python Read Connector, you must choose a connector code interface. With the Spark DataFrame interface, Ascend reads in Spark DataFrames.

Prerequisites

  • A Custom Python Connection

Required Python Functions

The following table describes the functions available when creating a new Python Read Connector utilizing Spark DataFrame interface. Create a Python Read Connector using the information below and these step-by-step instructions.

FunctionUseDescription
contextCreates the session that your code will work within. Passes in a string from the Python Connection.We recommend completing the session setup with context, e.g. create the database connection, the HTTP session, etc. User input credentials are only available through this function.
list_objectsCreates a list of all data fragments identified by the fingerprint value.Ascend runs the list_objects function every time the read connector refreshes and only processes data fragments that either:
- Have a name that does not already exist in the previous refresh, or
- Have a name that exists in the previous refresh but has a fingerprint.

Each dictionary has three key values:
name - A string value associated with the name of each partition

fingerprint - A uniquely identifiable string associated with each partition

is_prefix - A boolean that represents whether or not the current partition holds any child partitions
read_spark_dataframeReads the data and returns a Spark DataFrame.Spark DataFrames are distributed across the Spark cluster. If you were to spin up a Spark driver and a number of executers

spark: SparkSession: Spark DataFrames gives you the ability to work within the Spark session and utilize/allocate resources of the Spark cluster.

🚧

Out of Memory Exception

Because Spark DataFrames are loaded into processing memory, large amounts of data can result in an out of memory exception.

Recursive list_objects

Metadata is a Python dictionary that defines a partition. Metadata is used in both list_objects and read_bytes. To trigger the recursive behavior within in list_objects and create partitions, set is_prefix to True. If a previously created partition is not recalled when generating list_objects, all previous partition metadata will be deleted.

🚧

When constructing your Python code, list_objects must return the partition metadata for all the partitions you expect to be in the component.

Example Spark DataFrame Code

The following code example describes reading a spreadsheet for Google Sheets.

# This example reads a spreadsheet from Google Sheet to explain the functions to implement

from pyspark.sql import SparkSession, DataFrame
from typing import Dict, Any, Iterator


def context(credentials) -> Dict[str, Any]:
   """
     Sets up the context for reading and listing data from data source. 
     This is where the Python Connection information will be passed through. 
     Avoid opening a database connection. 
     """
  service_account_info = json.loads(credentials)

  creds = service_account.Credentials.from_service_account_info(service_account_info, scopes=SCOPES)
  g_sheet = build('sheets', 'v4', credentials=creds)
  drive = build('drive', 'v3', credentials=creds)

  return {
      'g_sheet_client': g_sheet,
      'drive_client': drive,
  }


def list_objects(context: Dict[str, Any], metadata) -> Iterator[Dict[str, Any]]:


  yield {'name': 'example_id', 'fingerprint': 'fingerprint', 'is_prefix': False}


def read_spark_dataframe(spark: SparkSession, context: Dict[str, Any], metadata) -> DataFrame:
  """
    # Returns a Spark DataFrame.
    """
  columns = ["language", "users_count"]
  data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
  # Create the pandas DataFrame
  return spark.createDataFrame(data, schema=columns)

Parsers

Ascend natively parses Spark DataFrames and does not require additional parser configurations.

Advanced Settings for Spark Clusters

🚧

For Spark DataFrames, you consider manually adjusting driver size, executors, and executor size. Custom Spark Params for stage: LIST applies parameters to list_objects. Custom Spark Params for stage: READ applies parameters to read_spark_dataframes.