- Get link
- X
- Other Apps
- Get link
- X
- Other Apps
Recently at work we've been trying to find a way to run Spark jobs from Airflow that best suits our needs. Turns out, there's more than one way to skin a cat.
To sum up our findings, you have a choice between one of the following options:
1. Invoking EMR Steps API (EMR only)
As you can probably tell, this list is going to go in order of my personal preference, starting with the least preferred.EMR is Amazon's take on big data platform, running on vanilla EC2 instances and including all major tools you would expect to see in a big data cruncher: Hadoop, Hive, and Spark among others.
If you're dead set on using Amazon infra for life and never expect to migrate to other ways of running a Spark cluster - then this option is for you.
How to run jobs:
'Providers' section of Airflow has been sporting a collection of Steps operators and sensors for quite a while now. This example will get you started.
Pros:
- You can not only execute jobs, but even manipulate EMR clusters. Ability to bring up and terminate clusters on-demand is a great trick to have up your sleeve.
- Ties you to a specific cloud-based solution that can be discontinued any time. I don't think it's likely, but it's always better to have options.
- As far as I know, you can only submit a JAR file to be executed as a step. Of course, if Java is your cup of coffee (😜 I'll show myself out) then it's not an issue at all.
- It's currently not possible to run Steps concurrently.
2. Setting up Spark Master context on the same node as Airflow Scheduler
This mode of operation expects you to have a spark-submit binary and YARN client config set up on the same machine where Airflow runs. It cooks up a correct spark-submit command line with all required arguments and executes it. Since the job is submitted in client mode, you can stream stdout and stderr logs directly to Airflow.In a nutshell, if your Spark cluster runs on EMR, it requires heaps of failure-prone operations: copying a folder with configuration files over from Spark master node to Airflow node, installing Spark on Airflow node, then whitelisting certain ports so that Airflow and Spark master nodes can communicate.
I did not investigate this option in detail as it did not seem to offer much advantage over others. One note: when I was trying this method out, I've noticed that restarting EMR cluster invalidates the context and you have to copy it over again, because the context is dependent on IP addresses of Spark nodes.
How to run jobs:
There are two options to choose from:
- SparkSubmitOperator in Airflow providers section. Additionally, you can read some guidelines on using it from this guy on Medium.
- This custom plugin - it doesn't appear to be actively maintained though: at the time of writing the last commit in that repo took place 2 years ago.
Pros:
- Streams spark-submit logs directly to Airflow.
- Full choice of every language Spark has to offer: Scala, Java, Python, R.
- Each IP address change (e.g. redeployment of EMR cluster) necessitates regeneration of context on Airflow node that executes spark-submit. This is cumbersome and failure-prone.
- Generated spark-submit command is a really long string and therefore is hard to read.
3. SSHOperator
With this option, we're connecting to Spark master node via SSH, then invoking spark-submit on a remote server to run a pre-compiled fat jar/Python file/R file (not sure about that) from HDFS, S3 or local filesystem.How to run jobs:
I'd only managed to find source code for SSHOperator in 'Providers' section of Airflow. Should be relatively simple use, since it's just a buffed up BashOperator.
Pros
- Streams Spark job logs directly to Airflow console.
- Airflow and Spark don't have to coexist on the same node.
- You don't need to copy any files over from Spark master node as was required in the previous option.
- Write code in all Spark-native languages: Scala, Java, Python, and probably R.
- Generated spark-submit command is a really long string and therefore is hard to read.
- Not clear what happens if SSH connection is lost mid-run. Technically, that should abort whatever was running in that session (and that means the job in our case).
4. Apache Livy
Livy is a REST client for Spark clusters. This is a perfect choise if you want to decouple your code from deployment configuration: Livy client runs on the master node, listens for incoming REST calls and manages job execution.Docs: Livy REST API
Apache Livy is actually not just one, but 2 distinct options as it provides two modes of submitting jobs to Spark: sessions and batches.
Since submitting jobs over Livy is the option I've explored the most, I've come up with two solutions to ease and speed up discovery and development:
- Spark cluster with Livy and Zeppelin that you can deploy locally via Docker Compose.
- Airflow operators to run Spark code in Livy, published in PyPi (Github).
a) Sessions
You create a session via REST API, then submit lines of code (aka statements) - also through REST API.One statement is one or more lines of code, and a single session can execute any number of statements.It's important to keep in mind that Livy session ≠ Spark session, for instance it's not possible to share Dataframes through Global temporary views in Livy sessions, even though it's called a session!
How to run jobs: REST client
You can use my Docker Compose Spark cluster to quickly run those light jobs, just to get a feel of what Spark feels like, without having to bring up a full-fledged cluster that costs money 😉.
Creating a session (specifying "kind": "spark|pyspark|sparkR|sql" is not mandatory anymore, see REST API docs):
1 2 3 4 5 6 | curl --request POST \ --url http: //localhost :8998 /sessions \ --header 'content-type: application/json' \ --data '{ "name" : "sesh" }' |
Now we can submit code through statements - in any of the available languages!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | curl --request POST \ --url http: //localhost :8998 /sessions/0/statements \ --header 'content-type: application/json' \ --data '{ "kind" : "spark" , "code" : "spark.range(1000 * 1000 * 1000).count()" }' curl --request POST \ --url http: //localhost :8998 /sessions/0/statements \ --header 'content-type: application/json' \ --data '{ "kind" : "pyspark" , "code" : "spark.range(1000 * 1000 * 1000).count()" }' curl --request POST \ --url http: //localhost :8998 /sessions/0/statements \ --header 'content-type: application/json' \ --data '{ "kind" : "sparkr" , "code" : "df <- as.DataFrame(\nlist(\"One\", \"Two\", \"Three\", \"Four\"),\n\"Smth else\")\nhead(df)" }' curl --request POST \ --url http: //localhost :8998 /sessions/0/statements \ --header 'content-type: application/json' \ --data '{ "kind" : "sql" , "code" : "SELECT 1" }' |
1 2 | curl --request GET \ --url http: //localhost :8998 /sessions/0/statements | python3 -m json.tool |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | { "total_statements" : 4 , "statements" : [ { "id" : 0 , "code" : "spark.range(1000 * 1000 * 1000).count()" , "state" : "available" , "output" : { "status" : "ok" , "execution_count" : 0 , "data" : { "text/plain" : "res0: Long = 1000000000\n" } }, "progress" : 1.0 , "started" : 1583257281831 , "completed" : 1583257291096 }, { "id" : 1 , "code" : "spark.range(1000 * 1000 * 1000).count()" , "state" : "available" , "output" : { "status" : "ok" , "execution_count" : 1 , "data" : { "text/plain" : "1000000000" } }, "progress" : 1.0 , "started" : 1583257300303 , "completed" : 1583257300865 }, { "id" : 2 , "code" : "df <- as.DataFrame(\nlist(\"One\", \"Two\", \"Three\", \"Four\"),\n\"Smth else\")\nhead(df)" , "state" : "available" , "output" : { "status" : "ok" , "execution_count" : 2 , "data" : { "text/plain" : " Smth else\n1 One\n2 Two\n3 Three\n4 Four" } }, "progress" : 1.0 , "started" : 1583257463189 , "completed" : 1583257468959 }, { "id" : 3 , "code" : "SELECT 1" , "state" : "available" , "output" : { "status" : "ok" , "execution_count" : 3 , "data" : { "application/json" : { "schema" : { "type" : "struct" , "fields" : [ { "name" : "1" , "type" : "integer" , "nullable" : false , "metadata" : {} } ] }, "data" : [ [ 1 ] ] } } }, "progress" : 1.0 , "started" : 1583257591038 , "completed" : 1583257591483 } ] } |
How to run jobs: Airflow
To apply this to Airflow, we'll make the DAG perform the same kinds of POST/GET requests using HttpOperator and HttpsSensor. You'll find more sample DAGs in the git repository.
1 2 3 4 5 6 7 8 9 10 11 12 | op = LivySessionOperator( name = "livy_session_example_{{ run_id }}" , statements = [ LivySessionOperator.Statement(code = scala_code, kind = "spark" ), LivySessionOperator.Statement(code = pyspark_code, kind = "pyspark" ), LivySessionOperator.Statement(code = sparkr_code, kind = "sparkr" ), LivySessionOperator.Statement(code = sql_code, kind = "sql" ), ], params = { "your_number" : 5 , "your_string" : "Hello world" }, task_id = "livy_session_example" , dag = dag, ) |
- Job language: Scala, Python, R and SQL. This is rich compared to other options where only Java is available.
- Very detailed information about each statement. Including exceptions, of course - statement shows as failed if an exception was thrown.
- Easy to recreate the exact job submitted by Airflow - all you need is any REST client.
- Rendered statement (code) templates are visible in WebUI:
- Have to escape everything twice.
- Have to be mindful of quotes in session file (e.g. passing ‘ “ ‘ into a session’s string literal enclosed in “ ” will look like “”” in the rendered template vs passing ‘ “ “ into a session’s string literal enclosed in ‘ ’)
- Can’t run session code separately to debug it
- params field is not Jinja-templatable, so all of the "magic" is pushed down into a Spark job code. This looks ok for small jobs and downright ugly once you get past 50 lines of code.
- Directly follows the previous point: your Spark jobs can not be executed outside Airflow because they include Jinja templates:
Yes, this is a valid Jinja-templated code, with double-escaping and whatnot. - Livy Sessions are NOT Spark sessions, even though it's implied from the name.
b) Batches.
Batch is a way to execute long-running Spark code. With batches you submit a path to Spark code via REST API. This file is picked up by Spark and executed. One batch = one Spark job.How to run jobs: REST client
http://gethue.com/how-to-use-the-livy-spark-rest-job-server-api-for-submitting-batch-jar-python-and-streaming-spark-jobs/
https://stackoverflow.com/questions/51566029/airflow-http-callback-sensor
You build your job as a fat jar (Java or Scala, no need to build anything for Python or R), save it somewhere your cluster can access it (HDFS, S3, local filesystem) and submit it like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | curl --request POST \ --url http: //localhost :8998 /batches \ --header 'content-type: application/json' \ --data '{ "className" : "Main" , "conf" : { "spark.hadoop.fs.s3a.impl" : "org.apache.hadoop.fs.s3a.S3AFileSystem" }, "args" : [ ] } ' # Alternatively, here's something you can run with just a local cluster # and this file https://github.com/panovvv/airflow-livy-operators/blob/master/batches/join_2_files.py # copied to bigdata-docker-compose/data/batches/join_2_files.py curl --request POST \ --url http: //localhost :8998 /batches \ --header 'content-type: application/json' \ --data '{ "pyFiles" : [ ], "args" : [ "-file1_sep=," , "-file1_header=true" , "-file1_schema=`Last name` STRING, `First name` STRING, SSN STRING, Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING" , "-file1_join_column=SSN" , "-file2_header=false" , "-file2_schema=`Last name` STRING, `First name` STRING, SSN STRING, Address1 STRING, Address2 STRING" , "-file2_join_column=SSN" , "-output_header=true" , "-output_columns=file1.`Last name`, file1.`First name`, file1.SSN, file2.Address1, file2.Address2" ] }' |
You can poll it through another endpoint:
1 | curl localhost:8998 /batches/0 | python3 -m json.tool |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | { "id" : 0 , "name" : null , "owner" : null , "proxyUser" : null , "state" : "success" , "appId" : "application_1583256958670_0003" , "appInfo" : { "driverLogUrl" : null , }, "log" : [ "2020-03-03 18:25:35,575 INFO storage.BlockManagerMaster: BlockManagerMaster stopped" , "2020-03-03 18:25:35,591 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!" , "2020-03-03 18:25:35,601 INFO spark.SparkContext: Successfully stopped SparkContext" , "2020-03-03 18:25:36,103 INFO util.ShutdownHookManager: Shutdown hook called" , "2020-03-03 18:25:36,105 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-43c6db3f-f546-4347-b488-ca82055b0af9/pyspark-9feac8d9-d738-4f72-9bd0-f473656bcf62" , "2020-03-03 18:25:36,108 INFO util.ShutdownHookManager: Deleting directory /tmp/localPyFiles-eee974a0-c53c-4a06-b238-d680254763d4" , "2020-03-03 18:25:36,111 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f073c90a-eb2d-42a2-be8f-7c5aa47b68f1" , "2020-03-03 18:25:36,116 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-43c6db3f-f546-4347-b488-ca82055b0af9" , "\nstderr: " , "\nYARN Diagnostics: " ] } |
or
1 | curl localhost:8998 /batches/0/log | python3 -m json.tool |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | { "id" : 0 , "from" : 103 , "total" : 203 , "log" : [ "2020-03-03 18:25:26,207 INFO datasources.FileSourceStrategy: Post-Scan Filters: isnotnull(SSN#2)" , "2020-03-03 18:25:26,217 INFO datasources.FileSourceStrategy: Output Data Schema: struct<last name: string, First name: string, SSN: string ... 1 more fields>" , "2020-03-03 18:25:26,240 INFO execution.FileSourceScanExec: Pushed Filters: IsNotNull(SSN)" , "2020-03-03 18:25:26,258 INFO datasources.FileSourceStrategy: Pruning directories with: " , "2020-03-03 18:25:26,262 INFO datasources.FileSourceStrategy: Post-Scan Filters: isnotnull(SSN#20)" , "2020-03-03 18:25:26,273 INFO datasources.FileSourceStrategy: Output Data Schema: struct<SSN: string, Address1: string, Address2: string ... 1 more fields>" , "2020-03-03 18:25:26,273 INFO execution.FileSourceScanExec: Pushed Filters: IsNotNull(SSN)" , "2020-03-03 18:25:27,388 INFO codegen.CodeGenerator: Code generated in 498.6125 ms" , "2020-03-03 18:25:27,579 INFO codegen.CodeGenerator: Code generated in 35.3556 ms" , "2020-03-03 18:25:27,705 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 389.5 KB, free 365.9 MB)" , "2020-03-03 18:25:27,792 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 42.6 KB, free 365.9 MB)" , "2020-03-03 18:25:27,796 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on livy:7004 (size: 42.6 KB, free: 366.3 MB)" , "2020-03-03 18:25:27,800 INFO spark.SparkContext: Created broadcast 0 from run at ThreadPoolExecutor.java:1149" , "2020-03-03 18:25:27,863 INFO execution.FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes." , "2020-03-03 18:25:28,220 INFO spark.SparkContext: Starting job: run at ThreadPoolExecutor.java:1149" , "2020-03-03 18:25:28,254 INFO scheduler.DAGScheduler: Got job 0 (run at ThreadPoolExecutor.java:1149) with 1 output partitions" , "2020-03-03 18:25:28,255 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (run at ThreadPoolExecutor.java:1149)" , "2020-03-03 18:25:28,256 INFO scheduler.DAGScheduler: Parents of final stage: List()" , "2020-03-03 18:25:28,259 INFO scheduler.DAGScheduler: Missing parents: List()" , "2020-03-03 18:25:28,270 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at run at ThreadPoolExecutor.java:1149), which has no missing parents" , "2020-03-03 18:25:28,313 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 11.4 KB, free 365.9 MB)" , "2020-03-03 18:25:28,320 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.9 KB, free 365.9 MB)" , "2020-03-03 18:25:28,321 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on livy:7004 (size: 5.9 KB, free: 366.3 MB)" , "2020-03-03 18:25:28,322 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161" , "2020-03-03 18:25:28,348 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at run at ThreadPoolExecutor.java:1149) (first 15 tasks are for partitions Vector(0))" , "2020-03-03 18:25:28,350 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks" , "2020-03-03 18:25:28,406 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, worker1, executor 2, partition 0, PROCESS_LOCAL, 8255 bytes)" , "2020-03-03 18:25:28,951 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on worker1:7003 (size: 5.9 KB, free: 366.3 MB)" , "2020-03-03 18:25:31,140 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on worker1:7003 (size: 42.6 KB, free: 366.3 MB)" , "2020-03-03 18:25:33,340 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4946 ms on worker1 (executor 2) (1/1)" , "2020-03-03 18:25:33,354 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool " , "2020-03-03 18:25:33,385 INFO scheduler.DAGScheduler: ResultStage 0 (run at ThreadPoolExecutor.java:1149) finished in 5.084 s" , "2020-03-03 18:25:33,427 INFO scheduler.DAGScheduler: Job 0 finished: run at ThreadPoolExecutor.java:1149, took 5.205493 s" , "2020-03-03 18:25:33,549 INFO codegen.CodeGenerator: Code generated in 31.8357 ms" , "2020-03-03 18:25:33,577 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.0 MB, free 361.9 MB)" , "2020-03-03 18:25:33,592 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 747.0 B, free 361.9 MB)" , "2020-03-03 18:25:33,595 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on livy:7004 (size: 747.0 B, free: 366.3 MB)" , "2020-03-03 18:25:33,596 INFO spark.SparkContext: Created broadcast 2 from run at ThreadPoolExecutor.java:1149" , "2020-03-03 18:25:34,141 INFO codegen.CodeGenerator: Code generated in 186.7491 ms" , "2020-03-03 18:25:34,191 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 389.3 KB, free 361.5 MB)" , "2020-03-03 18:25:34,367 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 42.7 KB, free 361.4 MB)" , "2020-03-03 18:25:34,368 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on livy:7004 (size: 42.7 KB, free: 366.2 MB)" , "2020-03-03 18:25:34,374 INFO spark.SparkContext: Created broadcast 3 from showString at NativeMethodAccessorImpl.java:0" , "2020-03-03 18:25:34,381 INFO execution.FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes." , "2020-03-03 18:25:34,475 INFO spark.SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0" , "2020-03-03 18:25:34,480 INFO scheduler.DAGScheduler: Got job 1 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions" , "2020-03-03 18:25:34,480 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (showString at NativeMethodAccessorImpl.java:0)" , "2020-03-03 18:25:34,480 INFO scheduler.DAGScheduler: Parents of final stage: List()" , "2020-03-03 18:25:34,481 INFO scheduler.DAGScheduler: Missing parents: List()" , "2020-03-03 18:25:34,489 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents" , "2020-03-03 18:25:34,515 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 13.8 KB, free 361.4 MB)" , "2020-03-03 18:25:34,519 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 6.8 KB, free 361.4 MB)" , "2020-03-03 18:25:34,521 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on livy:7004 (size: 6.8 KB, free: 366.2 MB)" , "2020-03-03 18:25:34,523 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1161" , "2020-03-03 18:25:34,526 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))" , "2020-03-03 18:25:34,527 INFO cluster.YarnScheduler: Adding task set 1.0 with 1 tasks" , "2020-03-03 18:25:34,531 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, worker1, executor 2, partition 0, PROCESS_LOCAL, 8250 bytes)" , "2020-03-03 18:25:34,655 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on worker1:7003 (size: 6.8 KB, free: 366.2 MB)" , "2020-03-03 18:25:34,857 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on worker1:7003 (size: 747.0 B, free: 366.2 MB)" , "2020-03-03 18:25:35,018 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on worker1:7003 (size: 42.7 KB, free: 366.2 MB)" , "2020-03-03 18:25:35,193 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 662 ms on worker1 (executor 2) (1/1)" , "2020-03-03 18:25:35,193 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool " , "2020-03-03 18:25:35,195 INFO scheduler.DAGScheduler: ResultStage 1 (showString at NativeMethodAccessorImpl.java:0) finished in 0.703 s" , "2020-03-03 18:25:35,195 INFO scheduler.DAGScheduler: Job 1 finished: showString at NativeMethodAccessorImpl.java:0, took 0.719762 s" , "+---------+----------+-----------+------------------+--------------------+" , "|Last name|First name| SSN| Address1| Address2|" , "+---------+----------+-----------+------------------+--------------------+" , "| Alfalfa| Aloysius|123-45-6789| 7098 East Road| Hopkins, MN 55343|" , "| Alfred|University|123-12-1234|98 Wellington Ave.| Lowell, MA 01851|" , "| Gerty| Gramma|567-89-0123| null| null|" , "| Rubble| Betty|234-56-7890| 9715 Penn St.| Royal Oak, MI 48067|" , "| Dandy| Jim|087-75-4321| 4 Ann St.|Hackensack, NJ 07601|" , "| Elephant| Ima|456-71-9012| null| null|" , "| George| Boy|345-67-3901| 13 Foxrun Ave.| Annandale, VA 22003|" , "|Heffalump| Harvey|632-79-9439| 5 Beech Street|Canyon Country, C...|" , "+---------+----------+-----------+------------------+--------------------+" , "" , "2020-03-03 18:25:35,284 INFO server.AbstractConnector: Stopped Spark@10becd81{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}" , "2020-03-03 18:25:35,310 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread" , "2020-03-03 18:25:35,390 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors" , "2020-03-03 18:25:35,396 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down" , "2020-03-03 18:25:35,406 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices" , "(serviceOption=None," , " services=List()," , " started=false)" , "2020-03-03 18:25:35,412 INFO cluster.YarnClientSchedulerBackend: Stopped" , "2020-03-03 18:25:35,548 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!" , "2020-03-03 18:25:35,572 INFO memory.MemoryStore: MemoryStore cleared" , "2020-03-03 18:25:35,573 INFO storage.BlockManager: BlockManager stopped" , "2020-03-03 18:25:35,575 INFO storage.BlockManagerMaster: BlockManagerMaster stopped" , "2020-03-03 18:25:35,591 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!" , "2020-03-03 18:25:35,601 INFO spark.SparkContext: Successfully stopped SparkContext" , "2020-03-03 18:25:36,103 INFO util.ShutdownHookManager: Shutdown hook called" , "2020-03-03 18:25:36,105 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-43c6db3f-f546-4347-b488-ca82055b0af9/pyspark-9feac8d9-d738-4f72-9bd0-f473656bcf62" , "2020-03-03 18:25:36,108 INFO util.ShutdownHookManager: Deleting directory /tmp/localPyFiles-eee974a0-c53c-4a06-b238-d680254763d4" , "2020-03-03 18:25:36,111 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f073c90a-eb2d-42a2-be8f-7c5aa47b68f1" , "2020-03-03 18:25:36,116 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-43c6db3f-f546-4347-b488-ca82055b0af9" , "\nstderr: " , "\nYARN Diagnostics: " ] } |
Here's an example of what the Livy batch does internally:
1 2 3 | spark-submit --master yarn --deploy-mode cluster --class Main \ |
How to run jobs: Airflow
Same approach as we applied with sessions: codify the REST API calls, as if they were being manually executed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | op = LivyBatchOperator( name = "batch_example_{{ run_id }}" , # Required arguments are positional, meaning you don't have to specify their name. # In this case, they are file1_path and file2_path. arguments = [ "-file1_sep=," , "-file1_header=true" , "-file1_schema=`Last name` STRING, `First name` STRING, SSN STRING, " "Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING" , "-file1_join_column=SSN" , "-file2_header=false" , "-file2_schema=`Last name` STRING, `First name` STRING, SSN STRING, " "Address1 STRING, Address2 STRING" , "-file2_join_column=SSN" , # uncomment # "-output_path=file:///data/output/livy_batch_example/" # "{{ run_id|replace(':', '-') }}", # to save result to a file "-output_header=true" , "-output_columns=file1.`Last name`, file1.`First name`, file1.SSN, " "file2.Address1, file2.Address2" , ], task_id = "livy_batch_example" , dag = dag, ) |
- Same choice of languages as above + Java.
- When submitting Spark jobs in cluster mode, they always mark as finished even if exception gets thrown during execution. I've noticed this in Amazon EMR v 5.26.0. Alas, I could not find a way to propagate exceptions to Livy - the only way to understand if something went wrong is to parse the logs for the word "Exception" and this is unreliable.
c) Batches + Spark/YARN REST API
We were not satisfied with two approaches above: Livy Batches (when executed in Spark's cluster mode) always show up as "complete" even if they actually failed, and Livy Sessions result in heavily modified Spark jobs that you can't execute without feeding them to Jinja engine first.Then I learned about a cool feature in Spark that made this perfect hybrid approach possible: Spark REST API. It was buried in the middle of a page with nondescript name "Monitoring and instrumentation" so it took me a while to find it. That gave me an idea, and eventually I found one more thing: YARN Resource Manager also runs a REST API.
It was only a matter of time to try those out, see that both APIs always shows the actual status of a Spark job, and add this to the Batch Operator.
How to run jobs: REST client
Follow the same steps as for the batches.
Take note of the appId field when doing GET /batches/${BATCH_ID}
It may look something like this: "application_1583256958670_0001"
Now, access the app status via Spark REST API:
1 2 | curl --request GET \ --url http: //localhost :18080 /api/v1/applications/application_1583256958670_0003/jpbs | python3 -m json.tool |
This endpoint returns a list of constituent jobs for our app. If all of them have "status": "SUCCEEDED" in them, that means the job completed successfully.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | [ { "jobId" : 1 , "name" : "showString at NativeMethodAccessorImpl.java:0" , "submissionTime" : "2020-03-03T18:25:34.481GMT" , "completionTime" : "2020-03-03T18:25:35.195GMT" , "stageIds" : [ 1 ], "status" : "SUCCEEDED" , "numTasks" : 1 , "numActiveTasks" : 0 , "numCompletedTasks" : 1 , "numSkippedTasks" : 0 , "numFailedTasks" : 0 , "numKilledTasks" : 0 , "numCompletedIndices" : 1 , "numActiveStages" : 0 , "numCompletedStages" : 1 , "numSkippedStages" : 0 , "numFailedStages" : 0 , "killedTasksSummary" : {} }, { "jobId" : 0 , "name" : "run at ThreadPoolExecutor.java:1149" , "submissionTime" : "2020-03-03T18:25:28.259GMT" , "completionTime" : "2020-03-03T18:25:33.423GMT" , "stageIds" : [ 0 ], "status" : "SUCCEEDED" , "numTasks" : 1 , "numActiveTasks" : 0 , "numCompletedTasks" : 1 , "numSkippedTasks" : 0 , "numFailedTasks" : 0 , "numKilledTasks" : 0 , "numCompletedIndices" : 1 , "numActiveStages" : 0 , "numCompletedStages" : 1 , "numSkippedStages" : 0 , "numFailedStages" : 0 , "killedTasksSummary" : {} } ] |
Same for YARN REST API (we're looking for $.app.state field - anything but "SUCCESS" is bad):
1 2 | curl --request GET \ --url http: //localhost :8088 /ws/v1/cluster/apps/application_1583256958670_0003 | python3 -m json.tool |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | { "app" : { "id" : "application_1583256958670_0003" , "user" : "root" , "name" : "join_2_files.py" , "queue" : "root.root" , "state" : "FINISHED" , "finalStatus" : "SUCCEEDED" , "progress" : 100.0 , "trackingUI" : "History" , "diagnostics" : "" , "clusterId" : 1583256958670 , "applicationType" : "SPARK" , "applicationTags" : "livy-batch-1-6tob8xbr" , "priority" : 0 , "startedTime" : 1583259906437 , "launchTime" : 1583259907256 , "finishedTime" : 1583259935642 , "elapsedTime" : 29205 , "amContainerLogs" : "http://worker2:8042/node/containerlogs/container_1583256958670_0003_01_000001/root" , "amHostHttpAddress" : "worker2:8042" , "masterNodeId" : "worker2:37883" , "allocatedMB" : - 1 , "allocatedVCores" : - 1 , "reservedMB" : - 1 , "reservedVCores" : - 1 , "runningContainers" : - 1 , "memorySeconds" : 125836 , "vcoreSeconds" : 74 , "queueUsagePercentage" : 0.0 , "clusterUsagePercentage" : 0.0 , "resourceSecondsMap" : { "entry" : { "key" : "vcores" , "value" : "74" } }, "preemptedResourceMB" : 0 , "preemptedResourceVCores" : 0 , "numNonAMContainerPreempted" : 0 , "numAMContainerPreempted" : 0 , "preemptedMemorySeconds" : 0 , "preemptedVcoreSeconds" : 0 , "preemptedResourceSecondsMap" : {}, "logAggregationStatus" : "DISABLED" , "unmanagedApplication" : false , "amNodeLabelExpression" : "" , "timeouts" : { "timeout" : [ { "type" : "LIFETIME" , "expiryTime" : "UNLIMITED" , "remainingTimeInSeconds" : - 1 } ] } } } |
How to run jobs: Airflow
Same as LivyBatchOperator above, but you pass an additional parameter:
verify_in="spark" #or verify_in="yarn"
Pros:
- Combines the best of both worlds: arguments that you pass to Spark code are templatable and that means you have jobs that you can execute outside of Airflow, you can see what exactly went wrong with the job if it failed.
- None that I know of!
Comments
Hi, thanks for the detailed explanation on using REST API to submit spark jobs from airflow (which is present in a different server) to Hadoop cluster. Appreciate your efforts in sharing your valuable experience to others.
ReplyDeleteWe are also trying to implement airflow and evaluating the options.
For now, we are considering the same option that you have mentioned in point 4 (with airflow and hadoop in different clusters and submit spark jobs using livy). However, I have a question regarding this architecture. Most of our dags need to check if input data is present in HDFS. For this, we need to have HDFSSensor. Could you please let me know if we can have this sensor triggered from airflow? Do we need any Hadoop config files be copied to airflow to do this?
Thanks.
Hello!
DeleteI don't have any experience with your concrete situation, but both HDFS sensor and my Livy operators are simply operators, as such they can be mixed and matched in any DAG. For example, you could make existence of certain two input files a prerequisite for running the Livy job like this (pseudocode):
sensor_f1 = HdfsSensor(url='hdfs://file1.txt')
sensor_f2 = HdfsSensor(url='hdfs://file2.txt')
combine = LivyBatchOperator(arguments=['hdfs://file1.txt','hdfs://file2.txt'])
sensor_f1 >> combine << sensor_f2
Hope that helps.
Hi Panov, thanks a lot for your reply. Do we need to have any Hadoop config files(like core-site.xml or yarn-site.xml) to be copied to the other server where airflow is deployed for checking the files? Have this question because airflow will be deployed in a different server and not in the Hadoop cluster. Thanks
ReplyDeleteHi Panov,
ReplyDeleteSorry for too many questions. I was just trying to understand the architecture that you have used. Are Airflow, spark and livy deployed in the same server Or is Airflow in one server and the remaining in a different cluster. Based on your post, I have used a docker image(spark + hadoop + livy) and another docker for Airflow. And trying to submit spark program from the airflow docker to another docker using REST API. But it doesnt work. Not sure how the Airflow docker knows how to submit spark program in the other docker.
Thanks
> Airflow in one server and the remaining in a different cluster.
DeleteThis should be the case.
> docker image(spark + hadoop + livy) and another docker for Airflow. And trying to submit spark program from the airflow docker to another docker using REST API.
This should work! Try digging deeper.
> Not sure how the Airflow docker knows how to submit spark program in the other docker.
You'd answered this inside your own question - Airflow instance only knows the URL of Livy instance, and sends HTTP requests (REST API) to this URL. This is how jobs are submitted, and this is why Airflow doesn't have to be on the same host as the rest of the cluster.