Application-Aware Network Scheduler
3x faster communication.
Using a novel coflow scheduling algorithm, Varys minimizes communication times of data-parallel applications without starving anyone.
2x more often on time.
Varys uses admission control to support soft deadlines for timely completion of most coflows.
Informed cluster sharing.
Using Varys, coexisting applications, frameworks, or users avoid collateral damage when optimizing respective communication performance.
No changes to user jobs.
Transfers data from sources to destinations without tweaking the number of connections or playing with bytes-in-flight in each connection.
Varys is an open source network manager/scheduler that aims to improve communication performance of Big Data applications. Its target applications/jobs include those written in Spark, Hadoop, YARN, BSP, and similar data-parallel frameworks.
Varys provides a simple API that allows data-parallel frameworks to express their communication requirements as coflows with minimal changes to the framework. Using coflows as the basic abstraction of network scheduling, Varys implements novel schedulers either to make applications faster or to make time-restricted applications complete within deadlines.
Enabling Varys in all the applications written in a framework can be as simple as changing the send and recv calls (or their equivalents) to their analogous put and get methods in the coflow API, which are exposed through the VarysClient class.
Existing user-written jobs can take advantage of coflows without any modifications.
|Method Signatures in Scala||Calling Process|
|def registerCoflow(coflowDesc: CoflowDescription): String||Driver|
|def putObject[T](objectId: String, obj: T, coflowId: String, size:Long)||Sender|
|def getObject[T](objectId: String, coflowId: String): T||Receiver|
|def unregisterCoflow(coflowId: String)||Driver|
Varys is most effective when applied to a number of concurrently-running coflows from different jobs. We have evaluated Varys by deploying it on a 100-node Amazon EC2 cluster and running traces collected from a 3000-node production Hive/Hadoop cluster at Facebook, and we compare Varys's performance with that of TCP fair sharing without using the coflow abstraction.
More details on different aspects of Varys's performance—comparisons with newer per-flow prioritization schemes or behavior with increasing load—can be found in accompanying research papers.
Despite the differences of end goals and/or programmability among data-parallel frameworks, their communication is structured, takes place between groups of machines in successive computation stages, and often involves a collection of parallel flows. More importantly, communication in these applications exhibit the all-or-nothing property—a communication stage cannot finish until all its flows have completed.
The coflow abstraction represents such collections of parallel flows to convey job-specific communication requirements – for example, minimizing completion time or meeting a deadline – to the network and enables application-aware network scheduling. Currently, we assume that the amount of data each flow needs to transfer is known before it starts. The flows of a coflow are independent in that the input of a flow does not depend on the output of another in the same coflow, and the endpoints of these flows can be in one or more machines. Effectively, one can consider a coflow to be a data-parallel network job.
Examples of coflows include the shuffle between the mappers and the reducers in MapReduce and the communication stage in the bulk-synchronous parallel (BSP) model. In fact, coflows can express most communication patterns between successive computation stages of data-parallel applications.
Finally, traditional point-to-point communication is also a coflow with a single flow.
|Example Frameworks||Coflow Structure|
|Shuffle in dataflow pipelines||Spark, Hadoop||Many-to-Many|
|Global communication barriers||BSP||All-to-All|
|Parallel read/write from/to distributed storage||HDFS||Many One-to-One|
Varys schedules coflows using heuristics developed to solve the recently discovered concurrent open shop scheduling with coupled resources problem. Similar to most scheduling problems, optimal coflow scheduling is computationally intractable.
From 10000 feet, the algorithm proceeds in two steps: Varys keeps an ordered list of coflows, preempting whenever necessary. Next, it uses an algorithm to allocate minimum required bandwidth to each flow of a coflow to complete all of its flows together.
In essence, Varys minimizes interleaving across coflows to minimize coflow completion times (CCT), while sacrificing flow-level fairness or flow completion times (FCT) that are not correlated to a job's completion time.
Consider two coflows C1 and C2; C1 has three flows transferring 1, 2, and 4 units of data, while C2 has two flows transferring 2 data units each.
Per-flow schemes focus on fair sharing or decreasing FCT, without caring about application's completion. Varys does the opposite.
More details on Varys, its performance, the coflow abstraction, and Varys's predecessor system (Orchestra) can be found in the following papers.
Varys also exposes APIs to find the machines with the most and the least network usage – getBestRxMachines and getBestTxMachines – that can be used to perform, for example, network-aware replica placement in HDFS as outlined in the following paper.
Is Varys a new networking protocol? Does it depend on a new one?
No. Varys runs in the application layer on top of underlying transport protocols (e.g., TCP). No changes to your hardware or OS is required. Except for minor changes to the frameworks, user jobs can transparently take advantage of Varys as well.
Varys seems to have put and get methods. Is Varys a storage system? Does it implement one underneath?
No, Varys does not have/need its own storage system. It sends and receives from user process (if data is in-memory) or from disk (if data is already stored there).
Will it always improve performance?
Varys will improve performance most of the time, specially when you have a multi-user, multi-framework cluster. Currently, overheads due to Varys's centralized design sometimes make it less useful for sub-second jobs and cause deadline misses on rare occasions. We are working on both directions: to make it more efficient by design and to better engineer the current system.
Which frameworks support Varys?
We are working on ensuring Varys support in Spark and plan on enabling Hadoop MapReduce 2.0 as well. We will appreciate your help in any of these endeavors.
Which languages does Varys support?
Currently Varys provides an interface only to JVM-based languages. It can support non-JVM languages as well, but we need your help to implement the language-specific interface to Varys.
What license is Varys under?
Varys is under the Apache 2.0 license.
How can I contribute?
If you would like to report a bug, please create a new issue.
If you have already resolved one, do send us a pull request on GitHub. We want to hear about your experience using Varys.
Finally, if you want to discuss a new idea, talk to us at the Varys Developers mailing list.
Where can I get more help?
Please post on the Varys Users mailing list. We will be glad to help!
This research is supported in part by NSF CISE Expeditions Award CCF-1139158, LBNL Award 7076018, and DARPA XData Award FA8750-12-2-0331, and gifts from Amazon Web Services, Google, SAP, The Thomas and Stacey Siebel Foundation, Apple, Inc., C3Energy, Cisco, Cloudera, EMC, Ericsson, Facebook, GameOnTalis, Guavus, HP, Huawei, Intel, Microsoft, NetApp, Pivotal, Splunk, Virdata, VMware, WANdisco and Yahoo!.