Developing Your Custom Python Connector
When developing a custom Ascend Python Connector, you need to define custom functions.
Developing your custom python connector code requires a Custom Python Connection and the following functions:
Function | Description |
---|---|
context | Gives Ascend the ability to generate context before running list_objects . This is where you can store credentials to access objects. |
list_objects | defines what objects you want read. Each object is read in as a separate partition unless objects are defined by metadata. You can specify multiple reads and partition objects into a hierarchy by using metadata and is_prefix . |
metadata | Assigns metadata to objects for Ascend to parse. metadata includes name , is_prefix , and fingerprint . |
name | This is the object or file name an will be converted to the partition name. |
is_prefix | Tells Ascend whether or not to group objects together by a prefix. If set to false , Ascend will place an object in it's own partition. If set to true , a prefix must be defined and all objects with that prefix will be placed in the same partition. |
fingerprint | The fingerprint is the SHA of the object which allows Ascend to determine if the object has changed. A common practice is the assign a time/date stamp as the fingerprint . However, the fingerprint must be a string. The fingerprint must be unique for each yield/partition. |
yield | Tells Ascend when to create a new partition. To create many partitions, you need multiple yield statements or an iterative cycle of yields. |
read_function | Tells Ascend to return the objects as one of the available interfaces. |
infer_schema | Optional function for schema generation. Tells Ascend what the schema is and avoid reading large dataset to infer the schema. The infer_schema must return a PySpark StructType or DataFrame. |
These custom functions allow Ascend to ingest and partition your data according to your specifications.
How to Code a Python Connector
To code a Python Connector, you'll need to define three functions: context
, list_objects
, and read_function
where "_function" is one of the three code interfaces. context
and list_objects
act as definitions and information Ascend will refer to when the read_function
is executed.
The examples below utilize the Spark Dataframe code interface to ingest a Google Sheet. For the purpose of the example, assume the gsheets PyPI dependencies are referenced when a Python Connection is created.
Step 1: Generate Context
The purpose of context is to communicate to Ascend the information required to access the data. The def context(credentials)
can reference the name the credentials stored in the Python Connection or coded into the Connector directly.
External Keys and Secrets Management through Credentials
To use external keys or a secrets manager, set the def context(credentials: str) where str is the name you assigned the credential. Any connector code interface will receive external key as a result of that function call.
In the below example, def context(credentials)
also uses gsheets functions to describe the location of the Google sheet to be accessed and how to return it to Ascend.
# This example reads a spreadsheet from Google Sheet to explain the functions to implement
def context(credentials) -> Dict[str, Any]:
service_account_info = json.loads(credentials)
creds = service_account.Credentials.from_service_account_info(service_account_info, scopes=SCOPES)
g_sheet = build('sheets', 'v4', credentials=creds)
drive = build('drive', 'v3', credentials=creds)
return {
'g_sheet_client': g_sheet,
'drive_client': drive,
}
Step 2: Define the Objects
Defining the objects requires the def list_objects
function. This function calls the context described above and tells Ascend how to read in –or yield
– the data according to the specified metadata
.
The yield
function tells Ascend to create a partition according to the metadata contained within the function. Metadata includes name
, fingerprint
, and is_prefix
. name
refers to the name of the file and will be converted into the Partition ID. fingerprint
defines when the object was last updated. A common way to specify a fingerprint is to convert a datetime stamp to a string. The fingerprint works as the D-SHA. In the event the Read Connector code is updated and new files are read in, a new fingerprint is required to tell Ascend that the data has not been processed.
The is_prefix
function specifies the creation of a new partition. If is_prefix
is set to False
, all objects are placed in a single partition with each yield
. Setting the function to True
indicates the partitions require a hierarchy. Partitioning with a Python Connector covers how to set up a partition hierarchy.
def list_objects(context: Dict[str, Any], metadata) -> Iterator[Dict[str, Any]]:
yield {'name': 'example_id', 'fingerprint': 'fingerprint', 'is_prefix': False}
Step 3: Read Data
The def read_function
sets context
and yield
into motion. It requires calling context and metadata so that Ascend knows how to partition the data as it's read in. The read_function
also requires that you know how the code interfaces work so that you can specify the structure of the data. For example, for Spark DataFrame, you need to know how to define the columns and data.
def read_spark_dataframe(spark: SparkSession, context: Dict[str, Any], metadata) -> DataFrame:
columns = ["language", "users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
return spark.createDataFrame(data, schema=columns)
Updated 9 months ago