Creating Readable Spark Jobs

Nowadays, it’s rather common to encounter Apache Spark being utilized in a lot of companies that need to process huge amounts of data, and things aren’t any different here at SoundCloud — as one can imagine, we have lots of data to process all the time.

While Spark does offer a streaming API, along with other tools like Kafka Streams and Apache Flink, we mainly (but not exclusively) use batch processing with Spark to solve a variety of problems, such as:

While Spark’s Scala API could be more idiomatic and type safe, it does offer a nice DSL that allows one to create expressive pipelines. However, it’s also not uncommon to see these jobs getting a little bit out of control in terms of readability, with many nested operations. This can lead to situations where one needs to spend some time reasoning about what’s going on and what types are being passed through all of those intermediate operators.

In this article, we’ll see how to break down the aforementioned types of jobs into smaller chunks and how to compose them using functional programming techniques. In order to illustrate this approach, let’s go through a simplified use case of a real system our team is responsible for. Even though the example illustrated here is implemented using Spark, these techniques could be employed with other technologies as well, like Scrunch.

Practical Example

One of the things the Content Team here at SoundCloud is responsible for is receiving content from labels and making it available to users. As part of this process, our systems need to decide which SoundCloud profile each track should be uploaded to.

The way this decision process works is based on rules that are matched against specific parts of the metadata of each track. For our simplified use case, we have two rule types:

  • One matching both artist and label names
  • One based only on the artist name

We’ll assume there are no conflicting rules, i.e. there will be no two rules matching the same label/artist or artist, and the rules matching both artist and label names have higher priority than the ones matching only the artist name. Of course, things are more complex in the real world, but this should be good enough for our example. Before getting into the job itself, let’s take a look at the model:

case class Label(id: Long, name: String)

case class Artist(id: Long, name: String)

case class Track(id: Long, title: String, artistId: Long, labelId: Long)

case class RuleType(name: String)

object RuleType {
  val Artist = RuleType("artist")
  val ArtistAndLabel = RuleType("artist-and-label")
}

trait MappingRule {
  def id: Long
  def ruleType: RuleType
  def userId: Long
}

case class MappingRuleArtistAndLabel(id: Long, artist: String, label: String, userId: Long) extends MappingRule {
  override def ruleType: RuleType = RuleType.ArtistAndLabel
}

case class MappingRuleArtist(id: Long, artist: String, userId: Long) extends MappingRule {
  override def ruleType: RuleType = RuleType.Artist
}

case class AppliedRule(id: Long, ruleType: RuleType, userId: Long)

case class TrackWithAppliedRule(id: Long, title: String, artist: Artist, label: Label, maybeAppliedRule: Option[AppliedRule])

A few things could be better in this model:

  • We could use value classes to wrap the primitive types like IDs and names.
  • The RuleType could be defined as a sealed trait with case objects implementing it.
  • The TrackWithAppliedRule could have a field of type MappingRule instead, thereby removing the need for the AppliedRule class.

Unfortunately, some of these ideas are rather difficult to implement with vanilla Spark. One approach to overcome these issues is by using Frameless, but that would be a topic for another blog post, so let’s leave the model as it is.

Moving forward, the input to the job will be five datasets:

  • One with the tracks
  • One with the artists
  • One with the labels
  • One with the artists’/labels’ rules
  • Another one with the artists’ rules

The job basically needs to:

  • Join tracks with artists and labels
  • Apply the artists’/labels’ rules
  • Apply the artists’ rules
  • Select the best rule and return a dataset of TrackWithAppliedRule

Starting with the Tests

Below is a single test case that can help with better understanding the logic and that will also help with the development and refactoring of the job:

import org.apache.spark.sql.SparkSession
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

class RuleApplierJobSpec extends Specification {

  trait Context extends Scope {
  val spark = SparkSession
    .builder()
    .appName("tracks-rules-applier")
    .master("local")
    .config("spark.sql.shuffle.partitions", "1")
    .getOrCreate()
  val job = new RuleApplierJob(spark)

  val label1 = Label(id = 1, name = "Label 1")

  val artist1 = Artist(id = 1, name = "Artist 1")
  val artist2 = Artist(id = 2, name = "Artist 2")
  val artist3 = Artist(id = 3, name = "Artist 3")

  val ruleArtist1AndLabel1 = MappingRuleArtistAndLabel(id = 1, artist = "Artist 1", label = "Label 1", userId = 1)
  val ruleArtist1 = MappingRuleArtist(id = 2, artist = "Artist 1", userId = 2)
  val ruleArtist2 = MappingRuleArtist(id = 3, artist = "Artist 2", userId = 3)

  val track1 = Track(id = 1, title = "Title Track 1", artistId = 1, labelId = 1)
  val track2 = Track(id = 2, title = "Title Track 2", artistId = 2, labelId = 1)
  val track3 = Track(id = 3, title = "Title Track 3", artistId = 3, labelId = 1)
  }

  "it should match rules against artist and label of each track" in new Context {
  import spark.implicits._

  val labels = Seq(label1).toDS()
  val artists = Seq(artist1, artist2, artist3).toDS()
  val artistsAndLabelsRules = Seq(ruleArtist1AndLabel1).toDS()
  val artistsRules = Seq(ruleArtist1, ruleArtist2).toDS()
  val tracks = Seq(track1, track2, track3).toDS()

  val result = job.run(tracks, artists, labels, artistsAndLabelsRules, artistsRules).collect().toSeq

  result should containTheSameElementsAs(Seq(
    TrackWithAppliedRule(1, "Title Track 1", artist1, label1, Some(AppliedRule(1, RuleType.ArtistAndLabel, 1))),
      TrackWithAppliedRule(2, "Title Track 2", artist2, label1, Some(AppliedRule(3, RuleType.Artist, 3))),
      TrackWithAppliedRule(3, "Title Track 3", artist3, label1, None)
  ))
  }
}

Working and Iterating on the Job

The first version of the job works and gets the tests passing, but it’s not that readable — especially if you consider people joining a team and having to understand and maintain the job:

import org.apache.spark.sql.{Dataset, SparkSession}

class RuleApplierJobV1(spark: SparkSession) {

  import spark.implicits._

  def run(tracks: Dataset[Track],
          artists: Dataset[Artist],
          labels: Dataset[Label],
          artistsAndLabelsRules: Dataset[MappingRuleArtistAndLabel],
          artistRules: Dataset[MappingRuleArtist]): Dataset[TrackWithAppliedRule] =
  tracks
    .joinWith(artists, tracks("artistId") === artists("id"), "inner")
    .joinWith(labels, $"_1.labelId" === labels("id"), "inner")
    .map { case ((track, artist), label) => (track, artist, label) }
    .joinWith(artistsAndLabelsRules, $"_2.name" === artistsAndLabelsRules("artist") &&
      $"_3.name" === artistsAndLabelsRules("label"), "left_outer")
      .map { case ((track, artist, label), artistAndLabelRule) => (track, artist, label, Option(artistAndLabelRule)) }
    .joinWith(artistRules, $"_2.name" === artistRules("artist"), "left_outer")
    .map { case ((track, artist, label, maybeArtistAndLabelRule), ruleArtist) => (track, artist, label, maybeArtistAndLabelRule, Option(ruleArtist)) }
    .map { case (track, artist, label, maybeArtistAndLabelRule, maybeArtistRule) =>
      TrackWithAppliedRule(
        track.id,
          track.title,
          artist,
          label,
          maybeArtistAndLabelRule.orElse(maybeArtistRule).map(r => AppliedRule(r.id, r.ruleType, r.userId)))
    }
}

A first approach to make it better is a very basic technique in programming: breaking a big problem into smaller pieces. We can do that by creating smaller functions, each one doing one part of the work:

import org.apache.spark.sql.{Dataset, SparkSession}

class RuleApplierJob(spark: SparkSession) {

  import RuleApplierJob._

  def run(tracks: Dataset[Track],
          artists: Dataset[Artist],
          labels: Dataset[Label],
          artistsAndLabelsRules: Dataset[MappingRuleArtistAndLabel],
          artistRules: Dataset[MappingRuleArtist]): Dataset[TrackWithAppliedRule] = {
  val tracksWithArtists = joinTracksWithArtists(tracks, artists)
  val tracksWithArtistsAndLabels = joinTracksAndArtistsWithLabels(tracksWithArtists, labels)(spark)
  val withArtistsAndLabelsRules = applyArtistsAndLabelsRules(tracksWithArtistsAndLabels, artistsAndLabelsRules)(spark)
  val withArtistsRules = applyArtistRules(withArtistsAndLabelsRules, artistRules)(spark)
  convertToTrackWithAppliedRule(withArtistsRules)(spark)
  }
}

object RuleApplierJob {

  private def joinTracksWithArtists(tracks: Dataset[Track], artists: Dataset[Artist]) =
  tracks.joinWith(artists, tracks("artistId") === artists("id"), "inner")

  private def joinTracksAndArtistsWithLabels(tracksWithArtists: Dataset[(Track, Artist)], labels: Dataset[Label])(spark: SparkSession) = {
  import spark.implicits._
  tracksWithArtists.joinWith(labels, $"_1.labelId" === labels("id"), "inner")
    .map { case ((track, artist), label) => (track, artist, label) }
  }

  private def applyArtistsAndLabelsRules(tracksWithArtistsAndLabels: Dataset[(Track, Artist, Label)], artistAndLabelRules: Dataset[MappingRuleArtistAndLabel])(spark: SparkSession) = {
  import spark.implicits._
  tracksWithArtistsAndLabels
    .joinWith(artistAndLabelRules, tracksWithArtistsAndLabels("_2.name") === artistAndLabelRules("artist") &&
      tracksWithArtistsAndLabels("_3.name") === artistAndLabelRules("label"), "left_outer")
    .map { case ((track, artist, label), artistAndLabelRule) => (track, artist, label, Option(artistAndLabelRule), Option.empty[MappingRuleArtist]) }
  }

  private def applyArtistRules(tracksWithRules: Dataset[(Track, Artist, Label, Option[MappingRuleArtistAndLabel], Option[MappingRuleArtist])], artistRules: Dataset[MappingRuleArtist])(spark: SparkSession) = {
  import spark.implicits._
  tracksWithRules
    .joinWith(artistRules, tracksWithRules("_2.name") === artistRules("artist"), "left_outer")
    .map { case ((track, artist, label, maybeArtistAndLabelRule, _), ruleArtist) => (track, artist, label, maybeArtistAndLabelRule, Option(ruleArtist)) }
  }

  private def convertToTrackWithAppliedRule(tracksWithRules: Dataset[(Track, Artist, Label, Option[MappingRuleArtistAndLabel], Option[MappingRuleArtist])])(spark: SparkSession) = {
  import spark.implicits._
  tracksWithRules.map { case (track, artist, label, maybeArtistAndLabelRule, maybeArtistRule) =>
    TrackWithAppliedRule(
      track.id,
        track.title,
        artist,
        label,
        maybeArtistAndLabelRule.orElse(maybeArtistRule).map(r => AppliedRule(r.id, r.ruleType, r.userId)))
  }
  }
}

If you want to avoid passing around these gigantic dataset types with lots of tuples, there are basically two approaches to deal with these types of intermediate state:

  • Defining basic alias types
  • Wrapping them in case classes

In our case, let’s go with the first approach and define some alias types to pass around:

type TracksWithArtists = Dataset[(Track, Artist)]
type TracksWithArtistsAndLabels = Dataset[(Track, Artist, Label)]
type TracksWithRules = Dataset[(Track, Artist, Label, Option[MappingRuleArtistAndLabel], Option[MappingRuleArtist])]

There are still some nuances in this code that are worth elaborating on.

  • All of these small functions need to be defined in the companion object instead of in the class itself. The reason is simple: If they are defined in the class, it will cause Spark to try to serialize the entire class, and that will lead to the infamous TaskNotSerializableException once the SparkSession object is not serializable.
  • Because Spark needs default encoders provided as implicit parameters by the SparkSession instance, the SparkSession needs to be passed to pretty much all of the internal functions.

Another point that plays on our side to help with the refactoring is the fact that the types of each one of the functions align perfectly (with exception of the SparkSession), i.e. the output of one function is the input of the next one:

creating readable spark jobs function pipeline

So we have a scenario that function composition is almost 100 percent a good fit, except for the annoyance of also having to pass the SparkSession around to almost all the functions. Having said that, it looks like this situation would allow the creation of a pipeline of functions if we could somehow inject the SparkSession dependency when we want to execute it instead of providing it to each function call. This seems to be a good fit for the Reader Monad pattern:

  "The Reader monad represents a computation, which can read values from a
  shared environment, pass values from function to function, and execute
  sub-computations in a modified environment."

This is also a perfect fit for our scenario, as each function needs to read a value (SparkSession) from a shared environment in order to perform its computation and return its value. Both Scalaz and Cats provide implementations of the Reader Monad. In our case, we’ll be using Cats, as it’s already part of the project we were working on, but the implementation would be quite similar with Scalaz. Here’s how the final version would look:

import cats.data.Reader
import org.apache.spark.sql.{Dataset, SparkSession}

class RuleApplierJob(spark: SparkSession) {

  import RuleApplierJob._

  def run(tracks: Dataset[Track],
          artists: Dataset[Artist],
          labels: Dataset[Label],
          artistsAndLabelsRules: Dataset[MappingRuleArtistAndLabel],
          artistRules: Dataset[MappingRuleArtist]): Dataset[TrackWithAppliedRule] =
  joinTracksWithArtists(tracks, artists)
    .flatMap(joinTracksAndArtistsWithLabels(_, labels))
    .flatMap(applyArtistsAndLabelsRules(_, artistsAndLabelsRules))
    .flatMap(applyArtistRules(_, artistRules))
    .flatMap(convertToTrackWithAppliedRule)
    .run(spark)
}

object RuleApplierJob {

  type TracksWithArtists = Dataset[(Track, Artist)]
  type TracksWithArtistsAndLabels = Dataset[(Track, Artist, Label)]
  type TracksWithRules = Dataset[(Track, Artist, Label, Option[MappingRuleArtistAndLabel], Option[MappingRuleArtist])]

  private def joinTracksWithArtists(tracks: Dataset[Track], artists: Dataset[Artist]) = Reader[SparkSession, TracksWithArtists] { _ =>
    // same as previous listing
  }

  private def joinTracksAndArtistsWithLabels(tracksWithArtists: TracksWithArtists, labels: Dataset[Label]) = Reader[SparkSession, TracksWithArtistsAndLabels] { spark =>
    // same as previous listing
  }

  private def applyArtistsAndLabelsRules(tracksWithArtistsAndLabels: TracksWithArtistsAndLabels, artistAndLabelRules: Dataset[MappingRuleArtistAndLabel]) = Reader[SparkSession, TracksWithRules] { spark =>
    // same as previous listing
  }

  private def applyArtistRules(tracksWithRules: TracksWithRules, artistRules: Dataset[MappingRuleArtist]) = Reader[SparkSession, TracksWithRules] { spark =>
    // same as previous listing
  }

  private def convertToTrackWithAppliedRule(tracksWithRules: TracksWithRules) = Reader[SparkSession, Dataset[TrackWithAppliedRule]] { spark =>
    // same as previous listing
  }
}

As seen above, instead of each function returning only its desired type, it returns a Reader that produces a type given a SparkSession. Reader Monads are composable, so the functions can be aligned after each other, thereby creating a pipeline of functions, and only when this pipeline is triggered does the external dependency (SparkSession in this case) need to be provided. The code above is much cleaner and easier to reason about! Even a newcomer to this codebase would be able to easily understand what’s going on here.

Another approach to compose the functions would be using a for comprehension instead of flatMap. Both approaches work and are quite similar, with the disadvantage that in this specific use case, one will need to give names to the intermediate values in the for comprehension solution.

For completeness, it’s worth mentioning that implicit parameters could also be used to pass the SparkSession around, but I find that less readable and a little bit harder to reason about, as the dependencies are not very clear. Having said that, it might be a better approach in some scenarios, such as that of an external library, and it’s also a matter of preference/style.

For the sake of simplicity, I decided to keep all the functions inside the same companion object, but you could also create a dedicated object for each of them instead, and the solution would end up being pretty much the same.

Conclusion

As companies write more and more Spark code in Scala, it’s important that we strive to bring more functional aspects to this codebase. In addition to looking for opportunities to apply these concepts as shown in this article, frameworks like Frameless are also working toward creating more expressive DSLs to work with Spark, so it’s worth keeping an eye on those as well.