Developing Your Custom Python Connector

When developing a custom Ascend Python Connector, you need to define custom functions.

Developing your custom python connector code requires a Custom Python Connection and the following functions:

FunctionDescription
contextGives Ascend the ability to generate context before running list_objects. This is where you can store credentials to access objects.
list_objectsdefines what objects you want read. Each object is read in as a separate partition unless objects are defined by metadata. You can specify multiple reads and partition objects into a hierarchy by using metadata and is_prefix.
metadataAssigns metadata to objects for Ascend to parse. metadata includes name, is_prefix, and fingerprint.
nameThis is the object or file name an will be converted to the partition name.
is_prefixTells Ascend whether or not to group objects together by a prefix. If set to false, Ascend will place an object in it's own partition. If set to true, a prefix must be defined and all objects with that prefix will be placed in the same partition.
fingerprintThe fingerprint is the SHA of the object which allows Ascend to determine if the object has changed. A common practice is the assign a time/date stamp as the fingerprint. However, the fingerprint must be a string. The fingerprint must be unique for each yield/partition.
yieldTells Ascend when to create a new partition. To create many partitions, you need multiple yield statements or an iterative cycle of yields.
read_functionTells Ascend to return the objects as one of the available interfaces.
infer_schemaOptional function for schema generation. Tells Ascend what the schema is and avoid reading large dataset to infer the schema. The infer_schema must return a PySpark StructType or DataFrame.

These custom functions allow Ascend to ingest and partition your data according to your specifications.

How to Code a Python Connector

To code a Python Connector, you'll need to define three functions: context, list_objects, and read_function where "_function" is one of the three code interfaces. context and list_objects act as definitions and information Ascend will refer to when the read_function is executed.

The examples below utilize the Spark Dataframe code interface to ingest a Google Sheet. For the purpose of the example, assume the gsheets PyPI dependencies are referenced when a Python Connection is created.

Step 1: Generate Context

The purpose of context is to communicate to Ascend the information required to access the data. The def context(credentials) can reference the name the credentials stored in the Python Connection or coded into the Connector directly.

👍

External Keys and Secrets Management through Credentials

To use external keys or a secrets manager, set the def context(credentials: str) where str is the name you assigned the credential. Any connector code interface will receive external key as a result of that function call.

In the below example, def context(credentials) also uses gsheets functions to describe the location of the Google sheet to be accessed and how to return it to Ascend.

# This example reads a spreadsheet from Google Sheet to explain the functions to implement

def context(credentials) -> Dict[str, Any]:
  service_account_info = json.loads(credentials)
  creds = service_account.Credentials.from_service_account_info(service_account_info, scopes=SCOPES)
  g_sheet = build('sheets', 'v4', credentials=creds)
  drive = build('drive', 'v3', credentials=creds)

  return {
      'g_sheet_client': g_sheet,
      'drive_client': drive,
  }

Step 2: Define the Objects

Defining the objects requires the def list_objects function. This function calls the context described above and tells Ascend how to read in –or yield – the data according to the specified metadata.

The yield function tells Ascend to create a partition according to the metadata contained within the function. Metadata includes name, fingerprint, and is_prefix. name refers to the name of the file and will be converted into the Partition ID. fingerprint defines when the object was last updated. A common way to specify a fingerprint is to convert a datetime stamp to a string. The fingerprint works as the D-SHA. In the event the Read Connector code is updated and new files are read in, a new fingerprint is required to tell Ascend that the data has not been processed.

The is_prefix function specifies the creation of a new partition. If is_prefix is set to False, all objects are placed in a single partition with each yield. Setting the function to True indicates the partitions require a hierarchy. Partitioning with a Python Connector covers how to set up a partition hierarchy.

def list_objects(context: Dict[str, Any], metadata) -> Iterator[Dict[str, Any]]:
  yield {'name': 'example_id', 'fingerprint': 'fingerprint', 'is_prefix': False}

Step 3: Read Data

The def read_function sets context and yield into motion. It requires calling context and metadata so that Ascend knows how to partition the data as it's read in. The read_function also requires that you know how the code interfaces work so that you can specify the structure of the data. For example, for Spark DataFrame, you need to know how to define the columns and data.

def read_spark_dataframe(spark: SparkSession, context: Dict[str, Any], metadata) -> DataFrame:
  columns = ["language", "users_count"]
  data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
  return spark.createDataFrame(data, schema=columns)