Spark DataFrame Properties
When creating a Python Read Connector, you must choose a connector code interface. With the Spark DataFrame interface, Ascend reads in Spark DataFrames.
Prerequisites
- A Custom Python Connection
Required Python Functions
The following table describes the functions available when creating a new Python Read Connector utilizing Spark DataFrame interface. Create a Python Read Connector using the information below and these step-by-step instructions.
Function | Use | Description |
---|---|---|
context | Creates the session that your code will work within. Passes in a string from the Python Connection. | We recommend completing the session setup with context , e.g. create the database connection, the HTTP session, etc. User input credentials are only available through this function. |
list_objects | Creates a list of all data fragments identified by the fingerprint value. | Ascend runs the list_objects function every time the read connector refreshes and only processes data fragments that either:- Have a name that does not already exist in the previous refresh, or - Have a name that exists in the previous refresh but has a fingerprint. Each dictionary has three key values: name - A string value associated with the name of each partitionfingerprint - A uniquely identifiable string associated with each partitionis_prefix - A boolean that represents whether or not the current partition holds any child partitions |
read_spark_dataframe | Reads the data and returns a Spark DataFrame. | Spark DataFrames are distributed across the Spark cluster. If you were to spin up a Spark driver and a number of executersspark: SparkSession : Spark DataFrames gives you the ability to work within the Spark session and utilize/allocate resources of the Spark cluster. |
Out of Memory Exception
Because Spark DataFrames are loaded into processing memory, large amounts of data can result in an
out of memory
exception.
Recursive list_objects
list_objects
Metadata is a Python dictionary that defines a partition. Metadata is used in both list_objects
and read_bytes
. To trigger the recursive behavior within in list_objects
and create partitions, set is_prefix
to True
. If a previously created partition is not recalled when generating list_objects
, all previous partition metadata will be deleted.
When constructing your Python code,
list_objects
must return the partition metadata for all the partitions you expect to be in the component.
Example Spark DataFrame Code
The following code example describes reading a spreadsheet for Google Sheets.
# This example reads a spreadsheet from Google Sheet to explain the functions to implement
from pyspark.sql import SparkSession, DataFrame
from typing import Dict, Any, Iterator
def context(credentials) -> Dict[str, Any]:
"""
Sets up the context for reading and listing data from data source.
This is where the Python Connection information will be passed through.
Avoid opening a database connection.
"""
service_account_info = json.loads(credentials)
creds = service_account.Credentials.from_service_account_info(service_account_info, scopes=SCOPES)
g_sheet = build('sheets', 'v4', credentials=creds)
drive = build('drive', 'v3', credentials=creds)
return {
'g_sheet_client': g_sheet,
'drive_client': drive,
}
def list_objects(context: Dict[str, Any], metadata) -> Iterator[Dict[str, Any]]:
yield {'name': 'example_id', 'fingerprint': 'fingerprint', 'is_prefix': False}
def read_spark_dataframe(spark: SparkSession, context: Dict[str, Any], metadata) -> DataFrame:
"""
# Returns a Spark DataFrame.
"""
columns = ["language", "users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
# Create the pandas DataFrame
return spark.createDataFrame(data, schema=columns)
Parsers
Ascend natively parses Spark DataFrames and does not require additional parser configurations.
Advanced Settings for Spark Clusters
For Spark DataFrames, you consider manually adjusting driver size, executors, and executor size. Custom Spark Params for stage: LIST applies parameters to
list_objects
. Custom Spark Params for stage: READ applies parameters toread_spark_dataframes
.
Updated about 1 month ago