Spark DataFrame Properties
When creating a Python Read Connector, you must choose a connector code interface. With the Spark DataFrame interface, Ascend reads in Spark DataFrames.
Prerequisites
- A Custom Python Connection
Required Python Functions
The following table describes the functions available when creating a new Python Read Connector utilizing Spark DataFrame interface. Create a Python Read Connector using the information below and these step-by-step instructions.
Function | Description |
---|---|
context | Gives Ascend the ability to generate context before running list_objects . This is where you can store credentials to access objects |
list_objects | defines what objects you want read. Each object is read in as a separate partition unless objects are defined by metadata. You can specify multiple reads and partition objects into a hierarchy by using metadata and is_prefix . |
metadata | Assigns metadata to objects for Ascend to parse. metadata includes name , is_prefix , and fingerprint . |
name | This is the object or file name an will be converted to the partition name. |
is_prefix | Tells Ascend whether or not to group objects together by a prefix. If set to false , Ascend will place an object in it's own partition. If set to true , a prefix must be defined and all objects with that prefix will be placed in the same partition. |
fingerprint | The fingerprint is the SHA of the object which allows Ascend to determine if the object has changed. A common practice is the assign a time/date stamp as the fingerprint . However, the fingerprint must be a string. The fingerprint must be unique for each yield/partition. |
yield | Tells Ascend when to create a new partition. To create many partitions, you need multiple yield statements or an iterative cycle of yields. |
read_function | Tells Ascend to return the objects as one of the available interfaces. |
Out of Memory Exception
Because Spark DataFrames are loaded into processing memory, large amounts of data can result in an
out of memory
exception.
Recursive list_objects
list_objects
Metadata is a Python dictionary that defines a partition. Metadata is used in both list_objects
and read_bytes
. To trigger the recursive behavior within in list_objects
and create partitions, set is_prefix
to True
. If a previously created partition is not recalled when generating list_objects
, all previous partition metadata will be deleted.
When constructing your Python code,
list_objects
must return the partition metadata for all the partitions you expect to be in the component.
Example Spark DataFrame Code
The following code example describes reading a spreadsheet for Google Sheets.
# This example reads a spreadsheet from Google Sheet to explain the functions to implement
from pyspark.sql import SparkSession, DataFrame
from typing import Dict, Any, Iterator
def context(credentials) -> Dict[str, Any]:
"""
Sets up the context for reading and listing data from data source.
This is where the Python Connection information will be passed through.
Avoid opening a database connection.
"""
service_account_info = json.loads(credentials)
creds = service_account.Credentials.from_service_account_info(service_account_info, scopes=SCOPES)
g_sheet = build('sheets', 'v4', credentials=creds)
drive = build('drive', 'v3', credentials=creds)
return {
'g_sheet_client': g_sheet,
'drive_client': drive,
}
def list_objects(context: Dict[str, Any], metadata) -> Iterator[Dict[str, Any]]:
yield {'name': 'example_id', 'fingerprint': 'fingerprint', 'is_prefix': False}
def read_spark_dataframe(spark: SparkSession, context: Dict[str, Any], metadata) -> DataFrame:
"""
# Returns a Spark DataFrame.
"""
columns = ["language", "users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
# Create the pandas DataFrame
return spark.createDataFrame(data, schema=columns)
External Keys and Secrets Management for Credentials
To use external keys or a secrets manager, set the
def context(credentials: str)
wherestr
is the name you assigned the credential. Any connector code interface will receive external key as a result of that function call.
Parsers
Ascend natively parses Spark DataFrames and does not require additional parser configurations.
Advanced Settings for Spark Clusters
For Spark DataFrames, you consider manually adjusting driver size, executors, and executor size. Custom Spark Params for stage: LIST applies parameters to
list_objects
. Custom Spark Params for stage: READ applies parameters toread_spark_dataframes
.
Code Fingerprint Strategy
The Code Fingerprint Strategy controls Ascend's DataAware function. Ascend read's the code and the configuration, and calculates the hash of the component. If the code changes in any way, the data must be reprocessed. Currently, the only available setting is Automatic content-based fingerprint.
Schema Generation
The Schema Generation reads the first few rows of the dataset and generates a schema just as with other Ascend Read Connectors. However, in order to sample the dataset, schema generation will run the code inserted into the Connector Code Interface across the entire dataset.
Updated 9 months ago