Adding Dependencies in PySpark Transforms

How to Add and Get Files to Spark Jobs in PySpark Transforms

When running Spark in a distributed cluster mode (not a local mode), adding Spark job files and/or dependencies to executors, can be confusing.

The following two functions are commonly used:

  1. Adding a file (or directory) using pyspark.SparkContext.addFile, and/or
  2. Getting the path to a file (on an executor) using pyspark.SparkFiles.get

🚧

Do not use pyspark.SparkFiles.get in code that runs on the Spark driver.

Using pyspark.SparkFiles.get will return the path of the temporary directory on the driver. The temporary directory only exists on the driver. The directory is used as a staging location. Spark distributes the files previously added using addFile to the executors.

This temporary driver directory does not exist on the executors. If you try to access files using the path of this temporary directory, on a driver, you will receive an error message.

👍

It is best to use SparkFiles.get to retrieve the path to the file in code that is running on an executor, as long as you run the SparkFiles.get call on an executor.

Below we have provided two examples demonstrating how to use pyspark.SparkContext.addFile and pyspark.SparkFiles.get.

Example 1: Adding file on the driver, and passing correct relative path to executors

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import SparkFiles
import uuid


def infer_schema(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> T.StructType:
  return T.StructType([
    T.StructField('value', T.StringType())
])


def transform(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> DataFrame:    
    df0 = inputs[0]
    
    filename = f'test_{uuid.uuid4().hex}.csv'
    
    with open(filename, 'w') as out:
      out.write('hello world')

    spark_session.sparkContext.addFile(filename)
    df1 = spark_session.read.text(filename)
    return df1

If run on the Driver, this function will return the Driver's temporary directory where the file is staged to push to executors. That temporary directory is only on the driver and is not the same as the location that Spark pushes files to on the executors.

Example 2: Adding file on the driver, and using pyspark.SparkFiles.get in code running on executors

In code that runs on the executor, you can always use pyspark.SparkFiles.get() as it will return the path to the file pushed out from the Driver onto the executor.

In the example below, we force the code to run on executors using mapPartitions, and you can see that the pyspark.SparkFiles.get call is inside the function which is pushed to the executors. In this case, Spark will automatically provide the correct absolute path to the file on the executor.

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import SparkFiles
import uuid


def infer_schema(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> T.StructType:
  return T.StructType([
    T.StructField('value', T.StringType())
])


def transform(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> DataFrame:    
    df0 = inputs[0]
    
    filename = f'test_{uuid.uuid4().hex}.csv'
    
    with open(filename, 'w') as out:
      out.write('hello world')

    spark_session.sparkContext.addFile(filename)

    def func(iterator):
      for i in iterator:         
          with open(SparkFiles.get(filename)) as test_file:
            file_content = test_file.read()
            yield { 'value':  file_content}
    
    return spark_session.sparkContext.parallelize([1]).mapPartitions(func).toDF(schema='value string')

Note on SparkContext.addFile

The Spark documentation calls out the following: "A path can be added only once. Subsequent additions of the same path are ignored."

If you want your file added every time, the file must have a unique name as in the examples above. Unless you do this, Spark will not add the same file again, if it has already added that file before.

We have added a random UUID value to the filename in the examples above to prevent this issue.

👍

Consider using a distributed storage system such as blob storage for files you want to access from multiple nodes within a distributed Spark cluster. While pyspark.SparkContext.addFile is applicable, storing files in a filesystem that is itself distributed and can be accessed in parallel may be a more robust and understandable solution.