Managing dependencies and artifacts in PySpark

Posted by Aleksey Zhukov on

At Grubhub, we use different technologies to manage the substantial amounts of data generated by our system. One of them is Spark. Some of us also use PySpark, which is working well, but problems can arise while trying to submit artifacts and their dependencies to the Spark cluster for execution.

 

In this blog entry, we’ll examine how to solve these problems by following a good practice of using ‘setup.py’ as your dependency management and build mechanism. Doing so will “automagically” create the dependencies and artifact ZIP files for a complex application with a custom ‘setup.py’ command, and using these distribution zip files with a ‘spark-submit’ command. We’ll also demonstrate how to run different spark jobs in a generic way. We’ll conclude with an example of a fully functional and unit-tested application. You’re welcome to use that sample as a base for your own project.

 

Preconditions and setup

Before we dive in, we must install ‘pip’ (https://pip.pypa.io/en/stable/) and Spark (https://spark.apache.org/). (Official documentations would be your first resource, but, of course, you could look for different examples as well.) We’ll also need to configure the ‘SPARK_HOME’ environment variable.

 

Next, let’s examine how we configure our source repository. While approaches vary, we use ‘setuptool’ (https://setuptools.readthedocs.io/en/latest/) and its driver script, ‘setup.py’, which works very well. We suggest always using it, because it offers different functionalities, gives us a way to build artifact(s), and gives us the ability to add custom commands. We’ll use this ability to make a ‘bdist_spark’ command, which will build the application artifact and its dependencies.

 

Another important file is ‘requirements.txt’, which lists dependencies to be installed with our application. We prefer to split this file into runtime dependencies and test dependencies, which are needed only for tests. We’ll name this file ‘test_requirements.txt’.

 

We’ll use the following commands to install our dependencies:

 

$ pip install -r requirements.txt
$ pip install -r test_requirements.txt

 

Ideally, we’ll install these dependencies in a virtual python environment (https://virtualenv.pypa.io/en/stable/) because it helps us isolate our app’s Python environment and avoid potential package conflicts.

 

Setup.py provides a way to list dependencies as well, but defining the dependencies in two places violates the “Don’t repeat yourself” (DRY) principle. To avoid that, we’ll include the following in our ‘setup.py’ file:

 

from pip.req import parse_requirements

reqs = parse_requirements('requirements.txt', session=False)
requirements = [str(ir.req) for ir in reqs]

setup(

install_requires=requirements,
)

 

Now, let’s take another look at our example ‘requirement.txt’ file:

 

$ cat requirements.txt
pulp

 

‘pulp’ itself depends on ‘pyparsing’, and both packages would be installed with pip (see above). In other words, our application explicitly depends on one package and implicitly depends on two, so we’ll need to include both in our distribution zip file passed to the spark-submit ‘–py-files’ option. This is a simple example — but how do we know which transitive dependencies need to be put into our distribution files for more complex situations? Pip will help us with that.

 

If we run:

 

$ pip wheel -r requirements.txt -w dist

 

it will create ‘wheel’ files for all explicit and implicit dependencies from our ‘requirement.txt’ file and put them into ‘dist’ directory.

 

Those ‘wheel’ file are simply zip files:

 

$ file dist/PuLP-1.6.1-py2-none-any.whl
dist/PuLP-1.6.1-py2-none-any.whl: Zip archive data, at least v2.0 to extract

 

To build our application artifact, we’ll use:

 

$ python setup.py bdist_wheel

 

This will create another ‘wheel’ (zip file underneath) in ‘dist’ directory with our application files. Now that we have all the components to make our distribution files, we need to rename our ‘wheel’ files as zip files and provide them to ‘–py-files’ option.

 

All those steps can and should be automated. To handle automation, we wrote a custom ‘setup.py’ command, ‘bdist_spark’. Executing it will give us two files in ‘spark_dist’ directory — the application artifact file and all dependencies combined in another file.

 

$ python setup.py bdist_spark
running bdist_spark


$ ls spark_dist/* 
spark_dist/test_spark_submit-0.1-deps.zip 
spark_dist/test_spark_submit-0.1.zip

 

Now when we have the application artifact and dependency files, we can execute a ‘spark-submit’ command. To do so, we need to provide an entry point (i.e., ‘py’ file) into our application. This is enough if we have just one entry point — but in our case, we have different spark jobs that share the same functionality and are bundled together in one repository. In this case, we’ll want a common ‘py’ file to call our different ‘jobs’ in some generic way.

 

We can address that issue with this small ‘driver.py’ script:

 

import sys
import importlib

if len(sys.argv) == 1:
    raise SyntaxError("Please provide a module to load.")
module = importlib.import_module(sys.argv[1])
sys.exit(module.main(sys.argv[2:]))

 

At run time, this script looks for a given module, loads it, and executes a ‘main’ function with given parameters.  (I.e. The same way as you would pass a module name to python — see for ‘python -m .. ‘ examples.)

 

For instance:

 

$ spark-submit --py-files spark_dist/test_spark_submit-0.1-deps.zip,spark_dist/test_spark_submit-0.1.zip  
driver.py test_spark_submit.subfolder.spark_script parameter_to_pass_to_module

 

Finally, all of this wouldn’t be complete if we don’t test our code. For testing, we’d typically use the ‘nose’ testing framework, but we can also use other testing frameworks available for Python. We’ll also need to include two other packages – ‘py4j’ and ‘findspak’:

 

$ cat test_requirements.txt
nose
py4j
findspark 

 

The test is very simple — it shows how to wire everything together:

 

$ cat test_spark_submit/test/spark_script_test.py

from test_spark_submit.subfolder.spark_script import process, process2
from test_spark_submit.test.utils import SparkBaseTestCase
from pyspark.sql.types import Row

class HelloWorldTest(SparkBaseTestCase):
    def __init__(self, *args, **kwargs):
        super(HelloWorldTest, self).__init__(*args, **kwargs)

    def test_basic(self):
        input_list = ["hello world"]
        result = process(self.sc, input_list)
        assert result == input_list

    def test_csv(self):
        expected_result = [Row(C0='hello', C1='world')]
        df = process2(self.sc, 'test_spark_submit/test/resources/test_file.csv'
        assert expected_result == df.collect()

    def test_sql(self):
        expected_result = [Row(C0='hello', C1='world')]
        df = process2(self.sc, 'test_spark_submit/test/resources/test_file.csv')
        self.sql_context.registerDataFrameAsTable(df, 'test_table')
        result = self.sql_context.sql('select * from test_table').collect()

        assert expected_result == result

 

We use a locally created SparkContext, instantiated in ‘SparkBaseTestCase’ base class.

 

The tests do not require the dependencies to be packed into the Zip file created above because they run locally. However, sometimes the code might depend on other packages (jars) — for example, ‘spark-csv’. If so, at runtime, we’d provide it to ‘spark-submit’ with our ‘–packages’ option like so:

 

spark-submit --packages com.databricks:spark-csv_2.10:1.0.4

 

The challenge now is figuring out how to provide such dependencies to our tests. One solution is to modify ‘spark-default.conf’ and add the following line:

 

spark.driver.extraClassPath /path/to/your/jars/dir/*

 

But that’s ugly — we’re modifying our global environment, which has nothing to do with our tests.  And, we’ll need to manually resolve the transitive dependencies and download all of them into that directory.


Luckily, there is a better solution. In our tests, before we instantiate the SparkContext, we’ll set the environment variable like so:

 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'
self.sc = SparkContext(master='local[4]')

To run the tests, we’ll write:

$ python setup.py nosetests
running nosetests
[…]
16/10/17 20:44:05 INFO SparkContext: Running Spark version 1.6.2
16/10/17 20:44:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/17 20:44:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
[…]
16/10/17 20:44:06 INFO Utils: Successfully started service 'sparkDriver' on port 56208.
[…]
16/10/17 20:44:08 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
[…]
16/10/17 20:44:08 INFO SparkContext: Successfully stopped SparkContext
16/10/17 20:44:08 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/10/17 20:44:08 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/10/17 20:44:08 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
.
----------------------------------------------------------------------
Ran 3 test in 5.539s

OK

Now, our tests will succeed.

In this article, we showed how to automate the creation of an artifact and dependencies to use with ‘spark-submit’ command, showed how to run our different spark jobs in a generic way, and demonstrated how to write and execute the tests.