Custom Read Connectors

Creating a Custom Read Connector

Prerequisites:

  • Python code that can read in data and produce rows and columns
  • Access credentials
  • Data schema for the dataset
  • Business logic on how incremental data should be processed

Select Custom

Click the Custom icon on the Connectors panel

Custom Code and PIP Install

Ascend requires you to implement 3 python functions in your custom read connector:

  • context: Creates the session that your code will work within.
  • list_objects: Creates a list of all data fragments identified by the fingerprint value.
  • read_bytes: Generates the rows and columns for 1 individual data fragment.

πŸ‘

context function

We recommend to do all the session setup here, e.g. create the database connection, the HTTP session etc. This is the only function within which user input credentials will be made available.

# context function creates the session for rest of the code to work with. credentials will be the string representation of the credentials you select for this connector.

def context(credentials: str):
  try:
    ctx = json.loads(credentials)
    return ctx
  except Exception as e:
    print(e)
    return {}

πŸ‘

list_objects function

Ascend runs the list_objects function every time the read connector refreshes and only processes data fragments that either:

  1. Have a name that does not already exist in the previous refresh.
    -- Or --
  2. Have a name that exists in the previous refresh but has a fingerprint.
# list_objects function creates a list of all data fragments identified by the fingerprint value
'''
  schema for list objects 
  {
     'name': <object name >,
     'fingerprint': <unique string to indicate if an object has been modified since last read',
     'is_prefix': <used for blob storage case, it is indicate prefix to stop searching>
  }
'''
from google.protobuf.timestamp_pb2 import Timestamp

def list_objects(context: dict, metadata: dict):
    if metadata:
    obj = {'name': 'fragment1', 'fingerprint': 'md51', 'is_prefix': False}
    yield obj
    obj = {'name': 'fragment2', 'fingerprint': 'md52', 'is_prefix': False}
    yield obj
  else:
    obj = {'name': 'folder1', 'fingerprint': 'fmd51', 'is_prefix': True}
    yield obj

πŸ‘

read_bytes function

  1. The read_bytes function will run once for every data fragment that needs to be processed from the list_objects function (e.g. read_bytes will run 10 times if the list_objects yielded 10 data fragments.)
  2. The read_bytes function will not be run multiple times for the same fragment. Additionally, if read_bytes throws an uncaught exception, the entire read connector will error out.
  3. It's acceptable to yield data points in a format other than CSV, as long as the Parser is configured properly.
# read_bytes function generates the rows and columns for 1 individual data fragment

def read_bytes(context: dict, metadata: dict):
    yield ",".join(metadata['name'], metadata['fingerprint'])

Below is a screenshot where 3 Python functions are be implemented as code in the Ascend editor.

Ascend also supports installing of any pip packages via the PIP PACKAGES TO INSTALL area as shown below. Please contact Ascend at [email protected] if you want to connect to private packages and libraries.

Test Locally

The ability to test the code locally can be extremely helpful and expedite the connector development process. Please follow the instructions to download the wrapper in order to test the code locally.

Parsers & Schema

Data formats currently available are: Avro, JSON, Parquet and XSV. However, you can create your own parser functions to process a file format.

For custom read connectors, you will need to create the schema with column names and data types as well as specifying default values etc.

# Custom parser function example:
# import package needed 
def parser_function(reader, onNext):
  # Use reader to read in data, and pass a python map to onNext function as parsing result.
  # Reader is a python non seekable file like object
  # onNext is a callback function expecting a dict object for each row value.
  for line in reader:
    # strip spaces and skip on empty line
    line = line.strip()
    if line:
      # "loads" will convert line to python map
      onNext(json.loads('{ "col1":"' + line + '" }')
  reader.close()

Updated 5 months ago


Custom Read Connectors


Suggested Edits are limited on API Reference Pages

You can only suggest edits to Markdown body content, but not to the API spec.