Airflow + PySpark over Livy. Sessions and batches.


Recently at work we've been trying to find a way to run Spark jobs from Airflow that best suits our needs. Turns out, there's more than one way to skin a cat.

 To sum up our findings, you have a choice between one of the following options:

1. Invoking EMR Steps API (EMR only)

 As you can probably tell, this list is going to go in order of my personal preference, starting with the least preferred.

 EMR is Amazon's take on big data platform, running on vanilla EC2 instances and including all major tools you would expect to see in a big data cruncher: Hadoop, Hive, and Spark among others.
If you're dead set on using Amazon infra for life and never expect to migrate to other ways of running a Spark cluster - then this option is for you.

How to run jobs:
 'Providers' section of Airflow has been sporting a collection of Steps operators and sensors for quite a while now. This example will get you started.

Pros:
  • You can not only execute jobs, but even manipulate EMR clusters. Ability to bring up and terminate clusters on-demand is a great trick to have up your sleeve.
Cons (things I consider deal-breakers are in italic):
  • Ties you to a specific cloud-based solution that can be discontinued any time. I don't think it's likely, but it's always better to have options.
  • As far as I know, you can only submit a JAR file to be executed as a step. Of course, if Java is your cup of coffee (😜 I'll show myself out) then it's not an issue at all.
  • It's currently not possible to run Steps concurrently.

2. Setting up Spark Master context on the same node as Airflow Scheduler

 This mode of operation expects you to have a spark-submit binary and YARN client config set up on the same machine where Airflow runs. It cooks up a correct spark-submit command line with all required arguments and executes it. Since the job is submitted in client mode, you can stream stdout and stderr logs directly to Airflow.

 In a nutshell, if your Spark cluster runs on EMR, it requires heaps of failure-prone operations: copying a folder with configuration files over from Spark master node to Airflow node, installing Spark on Airflow node, then whitelisting certain ports so that Airflow and Spark master nodes can communicate.

 I did not investigate this option in detail as it did not seem to offer much advantage over others. One note: when I was trying this method out, I've noticed that restarting EMR cluster invalidates the context and you have to copy it over again, because the context is dependent on IP addresses of Spark nodes.

How to run jobs:
There are two options to choose from:
  1. SparkSubmitOperator in Airflow providers section. Additionally, you can read some guidelines on using it from this guy on Medium.
  2. This custom plugin - it doesn't appear to be actively maintained though: at the time of writing the last commit in that repo took place 2 years ago.
Also, here's an article if EMR is where you're planning to set this up.

Pros:
  • Streams spark-submit logs directly to Airflow.
  • Full choice of every language Spark has to offer: Scala, Java, Python, R.
Cons (things I consider deal-breakers are in italic):
  • Each IP address change (e.g. redeployment of EMR cluster) necessitates regeneration of context on Airflow node that executes spark-submit. This is cumbersome and failure-prone.
  • Generated spark-submit command is a really long string and therefore is hard to read.

3. SSHOperator

 With this option, we're connecting to Spark master node via SSH, then invoking spark-submit on a remote server to run a pre-compiled fat jar/Python file/R file (not sure about that) from HDFS, S3 or local filesystem.

How to run jobs:
I'd only managed to find source code for SSHOperator in 'Providers' section of Airflow. Should be relatively simple use, since it's just a buffed up BashOperator.

Pros
  • Streams Spark job logs directly to Airflow console.
  • Airflow and Spark don't have to coexist on the same node.
  • You don't need to copy any files over from Spark master node as was required in the previous option.
  • Write code in all Spark-native languages: Scala, Java, Python, and probably R.
Cons (things I consider deal-breakers are in italic):
  • Generated spark-submit command is a really long string and therefore is hard to read.
  • Not clear what happens if SSH connection is lost mid-run. Technically, that should abort whatever was running in that session (and that means the job in our case).

4. Apache Livy

 Livy is a REST client for Spark clusters. This is a perfect choise if you want to decouple your code from deployment configuration: Livy client runs on the master node, listens for incoming REST calls and manages job execution.

Docs: Livy REST API

 Apache Livy is actually not just one, but 2 distinct options as it provides two modes of submitting jobs to Spark: sessions and batches.

Since submitting jobs over Livy is the option I've explored the most, I've come up with two solutions to ease and speed up discovery and development:

a) Sessions

You create a session via REST API, then submit lines of code (aka statements) - also through REST API.One statement is one or more lines of code, and a single session can execute any number of statements.

It's important to keep in mind that Livy session ≠ Spark session, for instance it's not possible to share Dataframes through Global temporary views in Livy sessions, even though it's called a session!

How to run jobs: REST client
You can use my Docker Compose Spark cluster to quickly run those light jobs, just to get a feel of what Spark feels like, without having to bring up a full-fledged cluster that costs money 😉.

Creating a session (specifying "kind": "spark|pyspark|sparkR|sql" is not mandatory anymore, see REST API docs): Request above will return the session id.

Now we can submit code through statements - in any of the available languages! And finally, checking the state of all statements:


How to run jobs: Airflow
 To apply this to Airflow, we'll make the DAG perform the same kinds of POST/GET requests using HttpOperator and HttpsSensor. You'll find more sample DAGs in the git repository.
Pros:
  • Job language: Scala, Python, R and SQL. This is rich compared to other options where only Java is available.
  • Very detailed information about each statement. Including exceptions, of course - statement shows as failed if an exception was thrown.
  • Easy to recreate the exact job submitted by Airflow - all you need is any REST client.
  • Rendered statement (code) templates are visible in WebUI:

Cons (things I consider deal-breakers are in italic):
  • Have to escape everything twice.
  • Have to be mindful of quotes in session file (e.g. passing ‘ “ ‘ into a session’s string literal enclosed in “ ” will look like “”” in the rendered template vs passing ‘ “ “ into a session’s string literal enclosed in ‘ ’)
  • Can’t run session code separately to debug it
  • params field is not Jinja-templatable, so all of the "magic" is pushed down into a Spark job code. This looks ok for small jobs and downright ugly once you get past 50 lines of code.
  • Directly follows the previous point: your Spark jobs can not be executed outside Airflow because they include Jinja templates:

    Yes, this is a valid Jinja-templated code, with double-escaping and whatnot.

  • Livy Sessions are NOT Spark sessions, even though it's implied from the name.

b) Batches.

Batch is a way to execute long-running Spark code. With batches you submit a path to Spark code via REST API. This file is picked up by Spark and executed. One batch = one Spark job.

How to run jobs: REST client

http://gethue.com/how-to-use-the-livy-spark-rest-job-server-api-for-submitting-batch-jar-python-and-streaming-spark-jobs/
https://stackoverflow.com/questions/51566029/airflow-http-callback-sensor

You build your job as a fat jar (Java or Scala, no need to build anything for Python or R), save it somewhere your cluster can access it (HDFS, S3, local filesystem) and submit it like this:
Command above gives you the batch of newly submitted ID.

You can poll it through another endpoint:


or
to get the full logs, not just the status.




Here's an example of what the Livy batch does internally:

How to run jobs: Airflow
Same approach as we applied with sessions: codify the REST API calls, as if they were being manually executed.
Pros:
  • Same choice of languages as above + Java.
Cons (things I consider deal-breakers are in italic):
  • When submitting Spark jobs in cluster mode, they always mark as finished even if exception gets thrown during execution. I've noticed this in Amazon EMR v 5.26.0. Alas, I could not find a way to propagate exceptions to Livy - the only way to understand if something went wrong is to parse the logs for the word "Exception" and this is unreliable.

c) Batches + Spark/YARN REST API

We were not satisfied with two approaches above: Livy Batches (when executed in Spark's cluster mode) always show up as "complete" even if they actually failed, and Livy Sessions result in heavily modified Spark jobs that you can't execute without feeding them to Jinja engine first.

Then I learned about a cool feature in Spark that made this perfect hybrid approach possible: Spark REST API. It was buried in the middle of a page with nondescript name "Monitoring and instrumentation" so it took me a while to find it. That gave me an idea, and eventually I found one more thing: YARN Resource Manager also runs a REST API.

It was only a matter of time to try those out, see that both APIs always shows the actual status of a Spark job, and add this to the Batch Operator.

How to run jobs: REST client
Follow the same steps as for the batches.
Take note of the appId field when doing GET /batches/${BATCH_ID}
It may look something like this: "application_1583256958670_0001"
Now, access the app status via Spark REST API:
This endpoint returns a list of constituent jobs for our app. If all of them have "status": "SUCCEEDED" in them, that means the job completed successfully.


Same for YARN REST API (we're looking for $.app.state field - anything but "SUCCESS" is bad):



How to run jobs: Airflow
Same as LivyBatchOperator above, but you pass an additional parameter:
verify_in="spark"
#or
verify_in="yarn"

Pros:
  • Combines the best of both worlds: arguments that you pass to Spark code are templatable and that means you have jobs that you can execute outside of Airflow, you can see what exactly went wrong with the job if it failed.
Cons:
  • None that I know of!

Comments

  1. Hi, thanks for the detailed explanation on using REST API to submit spark jobs from airflow (which is present in a different server) to Hadoop cluster. Appreciate your efforts in sharing your valuable experience to others.

    We are also trying to implement airflow and evaluating the options.

    For now, we are considering the same option that you have mentioned in point 4 (with airflow and hadoop in different clusters and submit spark jobs using livy). However, I have a question regarding this architecture. Most of our dags need to check if input data is present in HDFS. For this, we need to have HDFSSensor. Could you please let me know if we can have this sensor triggered from airflow? Do we need any Hadoop config files be copied to airflow to do this?

    Thanks.

    ReplyDelete
    Replies
    1. Hello!

      I don't have any experience with your concrete situation, but both HDFS sensor and my Livy operators are simply operators, as such they can be mixed and matched in any DAG. For example, you could make existence of certain two input files a prerequisite for running the Livy job like this (pseudocode):

      sensor_f1 = HdfsSensor(url='hdfs://file1.txt')
      sensor_f2 = HdfsSensor(url='hdfs://file2.txt')
      combine = LivyBatchOperator(arguments=['hdfs://file1.txt','hdfs://file2.txt'])
      sensor_f1 >> combine << sensor_f2

      Hope that helps.

      Delete
  2. Hi Panov, thanks a lot for your reply. Do we need to have any Hadoop config files(like core-site.xml or yarn-site.xml) to be copied to the other server where airflow is deployed for checking the files? Have this question because airflow will be deployed in a different server and not in the Hadoop cluster. Thanks

    ReplyDelete
  3. Hi Panov,

    Sorry for too many questions. I was just trying to understand the architecture that you have used. Are Airflow, spark and livy deployed in the same server Or is Airflow in one server and the remaining in a different cluster. Based on your post, I have used a docker image(spark + hadoop + livy) and another docker for Airflow. And trying to submit spark program from the airflow docker to another docker using REST API. But it doesnt work. Not sure how the Airflow docker knows how to submit spark program in the other docker.

    Thanks

    ReplyDelete
    Replies
    1. > Airflow in one server and the remaining in a different cluster.
      This should be the case.

      > docker image(spark + hadoop + livy) and another docker for Airflow. And trying to submit spark program from the airflow docker to another docker using REST API.
      This should work! Try digging deeper.

      > Not sure how the Airflow docker knows how to submit spark program in the other docker.
      You'd answered this inside your own question - Airflow instance only knows the URL of Livy instance, and sends HTTP requests (REST API) to this URL. This is how jobs are submitted, and this is why Airflow doesn't have to be on the same host as the rest of the cluster.

      Delete

Post a Comment