Photo by Guillaume Bolduc on Unsplash
November 12, 2020

Using Docker Images on EMR

Navin Vembar

CTO, Camber


AWS EMR is a service for launching workloads onto large clusters for processing. It supports Spark and PySpark workloads. Launching PySpark scripts that are self-contained and generally don’t need a lot of dependent libraries is… well, it’s not easy, but it’s well-documented. Complex jobs within large pipelines, though, often require specific dependencies and specific settings that need to be managed and if we can simplify that management through containers and match that up to how we test locally, that’s a big plus for us.

Let’s expand the Docker container we built in for our local use and get code running using that container.

We’re going to assume that you have the permissions to launch EMR containers from the console and are able to give that cluster permissions to read from the AWS Elastic Container Repository (ECR). Setting up the correct permissions and network infrastructure for a cluster is a whole separate topic that warrants its own post. We’re also going to assume you know how to use ECR itself.

Building the Docker image

We’ll be using EMR 6.1.0 here: 6.0 is a bit more difficult to set up to use Docker images, so that’s preferred. The Dockerfile we wrote in that previous post was for Spark 2.4.7, but EMR 6.1 uses Spark 3.0.0 and Hadoop 3.2.1 - which also has the advantage of running on JDK 11 instead of JDK 8.

FROM python:3.7 as builder

ARG SPARK_VERSION=3.0.0     # These two settings are changed from the previous implementation
ARG HADOOP_VERSION=3.2.1
ARG SPARK_PACKAGE=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
ARG SPARK_HOME=/usr/spark-${SPARK_VERSION}
RUN curl -sL --retry 3 \
  "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_PACKAGE}.tgz" \
  | tar xz -C /tmp/ \
 && mv /tmp/$SPARK_PACKAGE $SPARK_HOME \
 && chown -R root:root $SPARK_HOME

RUN pip install -U pip
RUN pip install poetry
RUN mkdir /home/spark
WORKDIR /home/spark

# Get the dependencies for the project locally
COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.in-project true && poetry install --no-root

# Use JDK 11 instead of JDK 8
FROM openjdk:11

ARG SPARK_VERSION=3.0.0
ARG HADOOP_VERSION=3.2.1
ARG SPARK_PACKAGE=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
ARG SPARK_HOME=/usr/spark-${SPARK_VERSION}

ENV PYTHON_DEPENDENCY=python3.7
RUN apt-get update && \
    apt-get install -y --no-install-recommends ${PYTHON_DEPENDENCY} && \
    apt-get clean

ENV SPARK_HOME=/usr/spark-${SPARK_VERSION}
ENV PYTHONPATH /home/spark/.venv

COPY --from=builder /usr/spark-${SPARK_VERSION}/ /usr/spark-${SPARK_VERSION}

RUN adduser --home /home/spark --disabled-password spark

USER spark
WORKDIR /home/spark

COPY --from=builder --chown=spark:spark /home/spark/.venv /home/spark/.venv

# This deals with the fact that `apt-get install` up there installs
# python to /usr/bin/python3.7, which is different than the location
# of python3 in python:3.7-buster
RUN rm /home/spark/.venv/bin/python
RUN ln -s /usr/bin/python3.7 /home/spark/.venv/bin/python

COPY adding.py test_add.py ./
ENV PATH=${PYTHONPATH}/bin:${SPARK_HOME}/bin:${PATH}
CMD ["pytest"]

Go back to the other post to see all the details here. Follow the directions in the ECR console in the Push commands. Note that downloading the Spark and Hadoop packages here is what makes our testing work. Later we’ll see how to trim down our containers.

Updating Our Script

Now, let’s make this interesting and add a few dependencies. In the same directory as the Dockerfile (and pyproject.toml file) above, run poetry add numpy pandas. Now we have a dependency that’s not always available to the code that we can use. Now let’s build the container and push it to ECR following the instructions on the AWS ECR Console for your repository or the documentation in the user guide. For the rest of this document, we’ll be using the image in the repository camber/example-emr for our purposes.

Now let’s create a short script which uses the new dependencies we’ve added.

from decimal import Decimal

import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

@F.pandas_udf(returnType=T.DoubleType())
def distance(x : pd.Series, y : pd.Series) -> pd.Series
    """
    A pandas UDF which returns the Euclidean distance between the x and y values
    """
    return np.sqrt((x * x) + (y * y))


x = np.random.uniform(low=0, high=90, size=10000)
y = np.random.uniform(low=0, high=90, size=10000)

rows = [
    Row(x=x[i], y=y[i]) for i in range(10000)
]
df = spark.createDataFrame(data=rows)


df = df.withColumn("dist", distance(F.col("x"), F.col("y")))
df.write.parquet("s3://camber-example/outputs")

Call this distance.py and copy it to s3://camber-example/code/distance.py.

Launching the cluster

There’s a natural instinct to think that putting distance.py into your Docker container will make it available for execution. But, remember that PySpark scripts are run on the (unfortunately named) master node and they issue commands to the distributed workers (e.g., df.write.parquet is a command which is send to all the workers, each of which writes the data they have access to to Parquet files at the given location). In fact, at the end of this post, we’ll talk a little more about what the EMR environment is actually doing to run these containers.

Let’s launch our cluster! For production loads, you would want to do more network configuration to put these machines into private subnets and set up IAM carefully. This will prevent the jobs from being able to access resources in your AWS account that it shouldn’t. But here, we’re going to rely on the defaults.

We’re going to need to install Docker on our EMR machines. We’re going to do that through bootstrap actions, commands that are run before the cluster starts running any steps.

  1. Create a file called install-docker.sh and copy the following into it:
#!/bin/sh

sudo yum install -y docker    # Install docker
sudo systemctl start docker   # Start the docker service
  1. Copy this to an S3 bucket in your account. For the remainder of this example, let’s say its in s3://camber-example/bootstrap/install-docker.sh

  2. Log onto the AWS Console and in the “Services” dropdown in the top-left corner, select EMR. Click the button at the top that says “Create Cluster”. All the instructions are as of the writing of the post. Select Go to advanced options at the top of the screen, since we’re going to have to add the bootstrap actions.

  3. Make sure to select release emr-6.1.0 - our configuration will rely on specific features of that release.

  4. Click Next to go to the Hardware screen - the defaults for the hardware are reasonable enough for our example, so go ahead and click Next again. (Note that this relies on the existence of a default VPC and subnet in your account.)

  5. In the Configurations text box, we’re going to add a setting that will allow the cluster to trust the Docker image from ECR.

  6. Copy the following, making sure you use the URI from ECR to that corresponds to your repository. Don’t include the name of your registry (the part the hostname). We’re also assuming here that the EMR service role has access to these registries and can pull images.

    [
        {
            "classification": "container-executor",
            "configurations": [
                {
                    "classification": "docker",
                    "properties": {
                        "docker.trusted.registries": "local,centos,<account>.dkr.ecs.<region>.amazonaws.com"
                    }
                }
            ]
        }
    ]
    
  7. Name your cluster on the General Options screen. I used emr-docker-example. Open up the section named Bootstrap Actions on this screen and add a Custom action. Click the Configure and add button. Name the action Install Docker and in the Script location field, add in the URL of your install-docker.sh script.

  8. Click Next, keep the Security defaults and then click Create Cluster. And now we wait….

Running the script

Once the cluster is up and running, we can add a step and watch it run. The parameters here are going to be numerous, but we’ll go through each so they make sense.

  1. Select the Cluster on the EMR page of the AWS Console. Select the Steps tab and hit the Add Step button.
  2. In the dialog, name your step (e.g., emr-docker-step), select Custom JAR and type in command-runner.jar for the JAR location.
  3. Then, let’s put in the actual PySpark job definition. In the Arguments text box, copy in the following:
spark-submit --deploy-mode cluster \
             --conf spark.executorEnv.JAVA_HOME=/usr \
             --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
             --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=<DOCKER_IMAGE> \
             --conf spark.executorEnv.PYSPARK_PYTHON=python3 \
             --conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=python3 \
             --conf spark.executor.defaultJavaOptions="-XshowSettings" \
             --conf spark.yarn.appMasterEnv.JAVA_HOME=/usr \
             --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
             --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=<DOCKER_IMAGE> \
             --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=python3 \
             --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3 \
             --driver-java-options="-XshowSettings" \
             s3://camber-example/code/distance.py

I put most of the PySpark parameters into the command itself rather than in the Configurations step above so we could go through them here. In production environments some of these parameters could be set globally there.

All of the spark.executorEnv.* and spark.yarn.appMasterEnv.* parameters are setting environment variables which change how the job gets run.

  • YARN_CONTAINER_RUNTIME_TYPE=docker means that this job will use Docker containers for execution.
  • YARN_CONTAINER_RUNTIME_DOCKER_IMAGE is the fully qualified URI to the Docker image in ECR, meaning that both the master environment and the executors will use that image for its context.
  • PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON tells the environment what Python executable in the image to use for running the individual tasks and the main program, respectively.
  • JAVA_HOME points to /usr because the OpenJDK 11 image uses /usr/ for its home, not /usr/local, which is the default
  • spark.executor.defaultJavaOptions is set to -XshowSettings because AWS EMR has defaults that use the Oracle Java implementation, not the OpenJDK executable, so it fails unless we set it to something else. Note that this is where you could tune garbage collection settings and other Java performance settings. This is the same reason that --driver-java-options is set to -XshowSettings

How This Works and Future Improvements

Reading through the EMR documentation on configuring an image only says that for PySpark to work, all you need is a Java container. How does all of this complex infrastructure work if that’s the only requirement?

AWS EMR is eliding a lot of details about how this actually is working. When EMR creates its cluster without using Docker, it’s configuring the machines’ Hadoop and Spark settings tell each executor and driver where to go to request resources from YARN, mounting the S3 buckets into the Hadoop filesystem, and a number of other settings. When running in Docker, somehow the Docker containers need to get (1) that same configuration and (2) all the expected Java libraries to correctly run.

In fact, what’s happening is that AWS EMR is mounting a directory containing the Hadoop and Spark files (matching the expected version for EMR 6.1.x). In that directory is a dynamically-created set of configuration files which sets the appropriate server names. There’s a script that runs inside the container when jobs get run, which sets a number of environment variables, including SPARK_HOME and HADOOP_HOME, to the mounted directory.

This mean that setting those environment variables - which you need to do to run unit tests - will break the ability to run them in production EMR. In fact, we included Hadoop and Spark in our Docker image so that unit tests pass, but they aren’t needed for execution. You may be able to get a much smaller image for production release, as long as it has Java in it.

Lastly, we definitely don’t want to have to bootstrap install Docker each time we launch a cluster. Creating a simple custom AMI with Docker installed, built off the default Amazon Linux image, would absolutely speed up start-up times.