Spark and Rust - How to Build Fast, Distributed and Flexible Analytics Pipelines with Side Effects

Andrea Venuta, Senior Software Engineer
Andrea Venuta, Senior Software Engineer - October 7, 2021

Apache Spark is a powerful piece of software that has enabled Phylum to build and run complex analytics and models over a big data lake comprised of data from popular programming language ecosystems.

Spark handles the nitty-gritty details of a distributed computation system for abstraction that allows our team to focus on the actual unit of computation.

Rust is another particularly important tool in our engineering toolbox. It enables us to build reliable, safe, and fast software, all from the comfort of a modern and well-thought-out type system that makes expressing complex ideas a breeze.

Rust is also naturally well suited for concurrency with its excellent threading support and a very efficient asynchronous abstraction over the native API, such as epoll, kqueue or IOCP. This makes it adept at iterating over many I/O operations quickly and efficiently, as measured by wall-clock time.

After working with both Spark and Rust for a while, it dawned on me that Rust is well suited to solving the challenges posed by distributed computation. The Apache foundation seems to concur as Apache Ballista a distributed computing platform written in Rust and based on the Apache Arrow ecosystem. While still in its infancy, it covers a lot of Spark's same use cases, which makes it a promising project for the analytics world!

We don't really need to wait for a wider adoption of Ballista to reap the benefits of distributed computation in Rust. If we are willing to dig a little beneath Spark's abstraction layer, and get our hands dirty, there is a way to make Spark and Rust get along.

Why should I break out of Spark land?

Spark's execution model is excellent for running distributed computations on well-structured data, but it makes it tricky to run custom code on the executors. User-Defined Functions (udf) are helpful but do not really cover the use case of running code with side effects, as they are intended for scalar computations. They are tied into the machinery of the Catalyst optimizer. Executing impure code in them results, at best, an impractical performance assessment experience.

Suppose we have a list of URLs and we need to perform some network requests on them. For example, we may try to scrape tarballs from React's GitHub releases page in order to perform some static analysis on each of them and discard them after the fact. While the operation is easily parallelizable by having one task for each tarball, the individual task has a lot of potential failure cases:

  1. We first need to fetch the tarball's bytes from the network via an HTTP request.
  2. We then need to interpret those bytes as a gzipped stream, hoping that it is not corrupted.
  3. Next, we need to extract the tar archive contained therein and navigate its directory tree.
  4. Afterwards, we need to identify the files we want to look at and validate their properties before finally applying our business logic.

While this would be feasible in Scala, we cannot deny that Rust's I/O concurrency model feels like a match made in heaven for a use case like this.

Where is the escape hatch?

Spark does not have strong native support for interoperating with other languages yet, but it does have the means to access one of the oldest, most robust and most persistent abstractions in software: standard I/O.

A layer below DataFrame's, we can find resilient distributed datasets, RDD`s for short. As per Spark's documentation, RDDs are a "fault-tolerant collection(s) of elements that can be operated on in parallel". Their focus or Spark's focus on providing an API for manipulating data the functional way while encapsulating away the way these operations are scheduled and distributed to worker nodes.

RDD's have a relatively little-known method, pipe(). It isn't documented in depth, but it is deceptively simple. It will pipe the RDD's contents into the stdin of a provided command and read back its stdout as an RDD of strings. The documentation mentions it would be suited for "Perl or bash script(s)", but nobody is stopping us from doing different things as long as we respect that protocol!

A small example

Let us suppose we have a dataset of financial tickers, and we want to query a financial API for their quotes. This is not a good real-world example, APIs will have rate limits we are bound to run into if processing anything at the scale Spark is meant to, but it is good enough to showcase what it looks like to introduce side effects in the processing pipeline.

In a spark-shell session, let us create a DataFrame with our list of tickers.

 scala> val df = Seq("FB", "AAPL", "AMZN", "NFLX", "GOOG").toDF("ticker")  

df: org.apache.spark.sql.DataFrame = [ticker: string]
 scala> df.show  

+------+
|ticker|
+------+
|    FB|
|  AAPL|
|  AMZN|
|  NFLX|
|  GOOG|
+------+

Let us then convert each of these rows to a JSON entry:

scala> df.toJSON.show  

+-----------------+
|            value|
+-----------------+
|  {"ticker":"FB"}|
|{"ticker":"AAPL"}|
|{"ticker":"AMZN"}|
|{"ticker":"NFLX"}|
|{"ticker":"GOOG"}|
+-----------------+

These objects are going to end up on the standard input to our Rust program. We don't necessarily need JSON here. With a single field, we could just as well pass each ticker on a line as-is, but in most real-world situations we will likely have more complex records to handle, hence some serialization scheme would be necessary. JSON, while not the fastest, is manageable and well-supported, and its serialization/deserialization time is vastly dominated by any I/O operations anyway.

To query our API for quotes, we're going to need an async executor, an HTTP request library, and a way of serializing and deserializing JSON. Let's rely on tokio, reqwest and serde, respectively.


[dependencies]
tokio = { version = "1.10.0", features = ["full"] }
reqwest = { version = "0.11.4", features = ["json"] }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.67"

Our Rust program will have a very simple structure. It is enough to stitch together calls to our dependencies in a read from stdin, deserialize, query, serialize, print to stdout loop, which will end once we have an EOF on the standard input.


#[derive(serde::Deserialize)]
struct Input {
    ticker: String,
}

#[derive(serde::Serialize)]
struct Output {
    ticker: String,
    open: f64,
    high: f64,
    low: f64,
    close: f64,
    volume: u64,
}

async fn query_api(line: String) -> Output {
    let input = serde_json::from_str::<input type="text">(&line).unwrap();
    reqwest::get(format!("API URI here! {}", input.ticker))
      .await
      .unwrap()
      .json::<output>()
      .await
      .unwrap()
}

#[tokio::main]
async fn main() {
    for line in std::io::stdin().lock().lines() {
        if let Ok(line) = line {
            println!(
                "{}",
                serde_json::to_string(&query_api(line).await).unwrap()
            );
        }
    }
}

We are finally ready to let the magic happen! Let's turn our DataFrame into a JSON Dataset, pull the RDD from it, pipe() it to our executable, and finally receive the output Dataset of strings.


cargo build --release
scala> val ds = df.toJSON.rdd.pipe("target/release/spark-rust").toDS  

ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.collect.foreach { println }  

{"ticker":"FB","open":380.0,"high":380.0199890136719,"low":379.106689453125,"close":379.2200012207031,"volume":581126}
{"ticker":"AAPL","open":153.0,"high":153.02999877929688,"low":152.91000366210938,"close":152.97999572753906,"volume":3498152}
{"ticker":"AMZN","open":3492.81494140625,"high":3492.81494140625,"low":3492.81494140625,"close":3492.81494140625,"volume":153770}
{"ticker":"NFLX","open":572.0,"high":572.0,"low":571.7249755859375,"close":571.7249755859375,"volume":113434}
{"ticker":"GOOG","open":2913.0,"high":2923.5400390625,"low":2912.2900390625,"close":2922.3798828125,"volume":51321}

At this point, we can get back to the DataFrame API in Spark by parsing these JSON records so that we can continue further work on our pipeline on this new data.

scala> spark.read.json(ds).show   

+------------------+------------------+------------------+----------------+------+-------+ 

|             close|              high|               low|            open|ticker| volume| 

+------------------+------------------+------------------+----------------+------+-------+ 

| 379.2200012207031| 380.0199890136719|  379.106689453125|           380.0|    FB| 581126| 

|152.97999572753906|153.02999877929688|152.91000366210938|           153.0|  AAPL|3498152| 

|  3492.81494140625|  3492.81494140625|  3492.81494140625|3492.81494140625|  AMZN| 153770| 

| 571.7249755859375|             572.0| 571.7249755859375|           572.0|  NFLX| 113434| 

|   2922.3798828125|   2923.5400390625|   2912.2900390625|          2913.0|  GOOG|  51321| 

+------------------+------------------+------------------+----------------+------+-------+ 

What happens under the hood

A Spark application is comprised of one driver node and one or more executor nodes on a cluster. When running on a single machine, the load is simply distributed across threads. When running on an actual cluster, every worker node that will play a role of executor needs to have a copy of the code to run. The mechanism with which this happens on the JVM side is transparent to users, and it consists of the driver simply delivering the code for a given Spark job to each executor that will have to run it.

How will the nodes get access to our Rust binary though? The Sparkcontext object, available on the driver has an addFile() method which will have the executors download the specified file, alongside the code they’re supposed to run. The files can be stored in any place Spark knows how to access, like Hadoop or S3.

 scala> sc.addFile("s3a://my-spark-executables/spark-rust", false) 

The file will then be available under the path returned by the SparkFiles.getRootDirectory function call, which is usually an automatically created temporary directory that will live as long as the Spark job itself.

scala> org.apache.spark.SparkFiles.getRootDirectory  

res0: String = C:\Users\username\AppData\Local\Temp\spark-...\userFiles-...

So, when deploying, our call to RDD.pipe would change to:

scala> val ds = df.toJSON.rdd.pipe(SparkFiles.getRootDirectory + "spark-rust").toDS  

ds: org.apache.spark.sql.Dataset[String] = [value: string]

A caveat: the Rust binaries need to be compiled by targeting the executor's platform! Compiling a Windows .exe will not work if we are going to run our code on a Linux cluster. More importantly, the location where the binaries are stored should be properly secured or we face the risk of an attacker running arbitrary code on our cluster.

Conclusions

In this post, we have seen a way to have Rust contribute to a Spark pipeline. While neither tremendously elegant nor hyper-optimized, the fundamentals of this technique are solid. Standard I/O has stood the test of time so far! Leveraging Rust in Phylum's analytics pipeline has allowed us to run complex static analysis at scale in a distributed fashion without needing to worry too much about workload scheduling.

Resources

  1. Rust: https://www.rust-lang.org/
  2. Spark: https://spark.apache.org/
  3. Ballista: https://ballistacompute.org/
  4. Arrow: https://arrow.apache.org/
  5. UDF: https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
  6. RDD: https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

Stay in the know.

Subscribe to our newsletter.