Spark is the user-friendly face of Big Data: a distributing programming framework which lets you write collection oriented algorithms in Scala that (theoretically) execute seamlessly across many machines. Spark has an elegant API (parallel collections with methods like map/reduce/groupByKey) that feels like programming locally. Unlike MapReduce, Spark can cache partial results across the memory of its distributed workers, allowing for significantly faster/lower-latency computations. If Spark worked as promised it would be a huge productivity boost over writing MapReduce pipelines.
Unfortunately, as I've learned over the past month, Spark will happily generate programs that mysteriously grind to a halt-- and tracking down the source of these problems can be a numbingly opaque process. There are at least two distinct problems: Spark's lazy evaluation makes it hard to know which parts of your program are the bottleneck and, even if you can identify a particularly slow expression, it's not always obvious why it's slow or how to make it faster.
More frustrating, however, is the inevitability with which Spark's many (initially invisible) default parameters will be wrong for your application. Buffers and heap sizes will turn out to be too small. Your reductions will use too few (or too many) workers. Too much memory gets used for data caching, or maybe it's too little. There's no rigorous or systematic way to set these parameters: you wait until things fail and supplicate at the feet of Spark's heuristic knob pantheon.
Unfortunately, as I've learned over the past month, Spark will happily generate programs that mysteriously grind to a halt-- and tracking down the source of these problems can be a numbingly opaque process. There are at least two distinct problems: Spark's lazy evaluation makes it hard to know which parts of your program are the bottleneck and, even if you can identify a particularly slow expression, it's not always obvious why it's slow or how to make it faster.
Lazy Evaluation
Spark's API lets you to express your intentions clearly via many fine-grained calls to transformations such as map, filter, flatMap, distinct, &c. If you ran a long chain of transformations one at a time, you'd incur a large communication overhead and clog each worker's local cache with useless partial results. Spark reconciles its functional style with performance via delayed execution: transformations get bundled together and only run on demand. The unruly down-side to Spark's execution model is that big swaths of your program will run as a monolithic glob of computation. And if that computation runs slowly...well, good luck figuring out which of its constituent parts is the culprit. Spark could ameliorate some of this confusion with a non-lazy debug mode or some built-in tooling assistance (i.e. a distributed profiler). For now, however, you're stuck (1) trying to reason your way through the lazy dependency graph ("Oh! This one is a shuffle dependency!") (2) forcing computations and checking how long they take.Too Many Parameters, Too Many Ways to Fail
Though Spark looks like you're programming on your laptop, it has many performance gotchas you must guard vigilantly against. Make sure your objects aren't using Java serialization (Kryo is faster), pull that object out of the closure, broadcast this array to keep it from being copied repeatedly. Though annoying for beginners, these rules of thumb at least have some consistency which can be learned.More frustrating, however, is the inevitability with which Spark's many (initially invisible) default parameters will be wrong for your application. Buffers and heap sizes will turn out to be too small. Your reductions will use too few (or too many) workers. Too much memory gets used for data caching, or maybe it's too little. There's no rigorous or systematic way to set these parameters: you wait until things fail and supplicate at the feet of Spark's heuristic knob pantheon.
My Experience Thus Far
Nothing I do can save my cluster from spending hours thrashing its way through a modest input size (before dying under a flood of obscure exceptions). I have recently tried all of the following (and more) to keep Spark from its stubborn predilection toward dying:- increasing JVM heap size of Spark's executors
- tweaking the "memory fraction" used for caching partial results
- writing custom Kryo serializers for objects stored in parallel collections
- co-partitioning collections before joining them
- changing the Akka frame size (receive buffer of the underlying actor library)
- turning speculative execution on/off
- turning shuffle file consolidation on/off
- changing defaultParallelism (a confusingly named parameter which sometimes affects the number of workers assigned to a parallel reduction)
- using stripped down structures (with fewer fields) to represent collection elements
- manually interning strings (by transforming them into integers) in hopes of speeding up groupByKey
- broadcasting shared objects (in case serialized closures were getting too heavy)
- changing the HDFS block size of the input data (to increase initial number of data partitions)
- writing a custom partitioner to try uniform binning of keys (in case RangePartitioner was slow)
- writing a custom RDD (parallel collection) for 3-way join
- ...&c &c.
After a month of "stab in the dark" debugging, I've learned a lot about the internals of Spark but still don't have a working application. In some weird twist of Stockholm syndrome, I've come to like expressing my algorithms in Spark's abstractions: if only they would actually run!
Has anyone else had a similar experience with Spark? Alternatively, has anyone had a positive experience with a non-trivial Spark codebase (bigger than tutorial wordcount examples). If so, what were your tricks to avoid the death-by-a-thousand-shuffles I've been experiencing? How about Spark's close cousins: Scalding, Scoobi, and Scrunch, are they substantially better (or worse)?
I'm sorry to hear about your troubles, as I'm a fan of Spark. However, I don't have enough experience with it to know how to solve your issues, etc. I've used Scalding a lot, which sits on Cascading, which (currently) sits on MapReduce. The API is similar to Spark's, so you might try porting a "troubled" part of your app to Scalding to see if it us noticeably better. It will have MR's slow aspects, but maybe it won't get bogged down.
ReplyDeleteHey Alex, it's too bad to hear that you had a bad experience, but definitely useful feedback. For tuning applications, did you take a look at the performance talk at Spark Summit 2013 (https://spark-summit.org/talk/wendell-understanding-the-performance-of-spark-applications/) and the app UI at localhost:4040? They offer a good place to start. The other thing I'd recommend is posting on the user mailing lists (http://spark.apache.org/community.html). There may be other people there who can help with specific issues.
ReplyDeleteMore generally, the community has worked quite actively over the past few releases to reduce configuration needed, and is continuing to do so. In the past two releases we've removed many of the cases that required sending data through Akka (so there's no frame size limit), added external spilling to most operations, and added quite a few more monitoring features. More generally, efforts like Spark SQL (https://databricks.com/blog/2014/03/26/spark-sql-manipulating-structured-data-using-spark-2.html) will let Spark automatically use an efficient storage format data where you describe the schema, eliminating the need to fine-tune Java object storage. Unfortunately there are just some known challenges in programming high-memory applications in Java, but the language features and API leave room for significant automatic management.
Hey Alex,
ReplyDeleteI’m a committer of Stratosphere (http://stratosphere.eu), where we are developing a system that has similar APIs to Spark, but with a different runtime. The runtime is storing objects always in a serialized form in byte arrays. The operators of the system operate on the serialized data, rather than on the Java objects. This leads to very efficient memory usage. Additionally, all operators (except the in-memory index for the iterations) support graceful degradation to disk (spilling). I would love to see whether Stratosphere solves some of your issues, especially the ones pertaining to memory configuration, and data serialization.
Robert
This comment has been removed by a blog administrator.
ReplyDeleteThe performance issues I can sympathize with; as you say, there's a bunch of random heuristics that you need to learn.
ReplyDeleteI haven't hand the random failures you've experienced though. My spark jobs sometimes take much longer than they should, but they never just fail.
Stupid comment box erased my comment after authentication.
ReplyDeleteHere's the short version: follow the advice on https://my.vertica.com/docs/4.1/HTML/Master/12962.htm - I had to set vm.max_map_count to 12800000 and them everything started working for me.
(I've had a very similar experience, "Spark is awesome, if only it worked!". Now it seems to have started working :) ).
This made me laugh, it's quite true, Spark's API and Theory is amazing and should make Big Data as easy as writing regular Scala code. I've been building full sized complicated production quality applications in Spark and found 90% of debugging time is spent on the simplest 10% of Sparks functionality. Once the basics and heuristics are eventually solved, all the complicated stuff is easy. For example controlling the number of partitions isn't easy (they provide `coalesce` for decreasing, but there is no shuffle-free way for increasing).
ReplyDeleteI've also pretty much done everything on your list, and will add to your list that writting an sbt build file to work with Intellij and java -cp is a nightmare.
Nevertheless I continue to use and love Spark. Before Spark I used Scalding for about a year, and although pretty cool, I'd say Spark is actually much easier to use, and orders faster.
For Spark to solve it's problems I feel it needs to focus it's efforts better as follows:
MORE:
- Documentation on building and running like any other application (i.e. with java -cp NOT some silly script), I'm fed up of the 10 billion reasons why building can go wrong, they should provide template SBT files for every release that actually work.
- File input and output formats
- a `repartitionWithoutShuffle` function
- default auto-magic optimised serialisation
- Cache, shuffle, and heap memory fraction usage monitoring
LESS:
- wasting effort on the Python API, it's time these dynamically typed hacky scripters get their act together and learn safe production worthy languages.
Thanks for your comment Fredri,
ReplyDeleteThere are some sensible developments in the 1.2 branch of Spark. Folks in my lab are still having a miserable time getting it running at scale but it seems like there's at least an increased emphasis on monitoring, debuggability, and memory/spilling. Maybe in a year or two Spark's scalability will finally match the elegance of its API?
Hi Alex, thanks for the article - I found your views on lazy evaluation and debugging quite interesting. However I thought that with Spark, the web interface on port 4040 shows a breakdown of the 'stages' of an application. So then if you are trying to find where your Spark program is slow, you can just look at how long it's taking at each stage in the web UI. Does that not solve the problem? Or is the concern that it's difficult to know which transformation is in each stage?
ReplyDeleteSome great helpful insights ....thx.
ReplyDeletethanks for inputs.. i was looking for a set of parameters to improve spark performance..
ReplyDeleteI've suffered the same issues as you! I love the Spark API. I hate how hard it is to get a Spark application working effectively! Ugh!!!!
ReplyDeleteBali is a Countryside nature-friendly and cultural retreat, Known for its scenic beauty, liberation, awakening perfection and contentment. Land in Hyderabad
ReplyDeleteIf you still have issues with Spark, may be you should consider DataTorrent RTS. I can fix you with a license.
ReplyDeleteI totally agree with your article title. I am having too many problems with Spark too. I guess the problem is there are too little QAs in open source. I am using Spark with about 20TB of data and 400 machines and some errors I see I cannot even find in Google. I have to fix Spark source to make a workaround is some cases.
ReplyDeleteAny updates on your opinions of Spark after a year of improvements/experience gains?
ReplyDeleteHey Jason,
DeleteI haven't touched Spark since I wrote this post (it was really meant as a public rage-quit). I've watched some of my lab members sink a large part of the past year into navigating around Spark's architectural deficiencies and am not particularly enthusiastic about joining their efforts. I do have to admit that it seems like every release slowly brings Spark closer to sanity and that I have seen some non-trivial tasks run successfully. However, the disconnect between the Spark team's triumphant PR and the actual miserable experience of trying to do non-trivial things to larger-than-memory datasets makes me wary of the whole project.
Could not agree more. I have followed tuning parameters to the T in terms of executors, memory, driver, masters, core and still get tasks killed, disassociated, and just general suckocity. All big data manipulation in Spark typically ends in log vomit, so you're just like, man, I am going to stick with Oozie, Hive, and Pig. Proven and reliable.
DeleteTry using Spark as a service. Databricks provides one and Qubole is another.
ReplyDeleteKryo serialization actually makes objects' memory footprints significantly bigger, thereby reducing available memory. This may make an application take significantly longer to run. I profiled my data processing app at 2.5 hours up from 2 after enabling Kryo.
ReplyDeleteI have been trying to work with Spark for about 3 weeks now, in EMR environment. It has been a pain, especially trying to understand the performance bottlenecks. I still don't understand why writing a set of 100MB files to S3 (ORC) should take 7-8 minutes.
ReplyDeleteI have been trying to work with Spark for about 3 weeks now, in EMR environment. It has been a pain, especially trying to understand the performance bottlenecks. I still don't understand why writing a set of 100MB files to S3 (ORC) should take 7-8 minutes.
ReplyDeleteIt's not just Spark - anything YARN, or of similar paradigm has the same issues. Machine learning software that does not use machine learning (not sure about TensorFlow as I know nothing about it). Time to eat own dog food and analyze the program, sample the data, project and build an execution plan that optimizes the given resources (a cluster of X machines, with Y cores, Z RAM etc). That is why SQL engines rule(d) - they abstract the execution internals.
ReplyDeleteI can't get Spark to perform anywhere near MR for large shuffles. It spews miles of indecipherable logs then fails. For the same work, MR on the other hand gives me a very readable description of how many records were read, shuffled and written and where execution took place.
ReplyDeleteInitialization and launch of Spark tasks is now much faster due to JVM optimizations. MapReduce runs a new JVM for each task, with all the ensuing consequences (loading all JAR files, JIT compilation, etc.), while Spark on each node keeps the running JVM running while running tasks through RPC calls.
ReplyDeleteSpark operates with RDD abstractions (Resilient Distributed Dataset), which are more versatile than MapReduce. Although for the sake of justice I must say that there is Cascading. This is a wrapper over MR, designed to add flexibility. For more info, visit www.activewizards.com
In addition, there is one more, very important fact - Spark allows you to develop applications not only for batch processing (batch processing), but also for working with streams of data (stream processing). While providing a single approach, and a single API (though with slight differences).