How to Solve Non-Serializable Errors When Instantiating Objects In Spark UDFs

PlaceIQ > Blog  > How to Solve Non-Serializable Errors When Instantiating Objects In Spark UDFs

How to Solve Non-Serializable Errors When Instantiating Objects In Spark UDFs

By Paul Brenner, Data Scientist, PlaceIQ

 

Here at the Place [1] we have beautiful intelligent engineers for days. DAYS [2]. A lot of what they do is locked away in Java and left as a mystery to the common man. Sometimes they write in Pig, because that is just life and sometimes in life you have to write jobs in Pig. There is plenty of Scalding too and that is cool because Scalding is pretty great. But, get this, some of these brilliant engineers direct a fraction of their super-human intellects to learning Spark and then, wow, free for all, guess who is now getting all of my hardest questions.

 

So you could imagine, when I started seeing dreaded non-serializable errors in my Spark code I turned to the famous Vlad[3]. Today, just this one time, you too can benefit from Vlad’s help.

 

The problem: You love Spark dataframes because obviously they are wonderful and you don’t hate progress, but you want to do something to your data that goes beyond the built in operations. So, right, you are going to use a UDF, that is fine. But, this time, you need to import a package and then instantiate an object for use in that UDF. I’m going to list foolish things that you shouldn’t do [4], but first I’ll explain the problem I’m using in these examples:

 

The problem

 

Recently at the Place [5], we have been crafting up some fancy enhancements to our visit attribution methodology. It is all top secret and hush-hush right now, but we are testing out some fancy distributions from Plasma Physics which are just going to let us attribute the hell out of visits. The visits won’t even see it coming. It will just be raw attribution carnage. Wait, what?
Anyway, as a simple bit of testing, I want to look at a bunch of visits and label the ones with a centroid inside a specific polygon. Here are two foolish ways to do that:

 

Foolish Way 1: Instantiate Object Inside UDF

 

import com.vividsolutions.jts.io.WKTReader
import com.vividsolutions.jts.geom.Coordinate
import com.vividsolutions.jts.geom.GeometryFactory
val polyContainsCentroidFilterFail1 = udf((geometry:String, Longitude:Double, Latitude:Double) => {
    val wkt = new WKTReader()
    val geometryFactory = new GeometryFactory()
    val centroid = geometryFactory.createPoint(new Coordinate(Longitude, Latitude))
    wkt.read(geometry).contains(centroid)
}) 

 

This works! It is a UDF! You can use it!
But, it isn’t exactly the smart way to go about solving this problem. You can see that I’m instantiating objects like WKTReader and GeometryFactory inside the UDF, which means that every time this UDF is called (basically for every row of data), time and memory is spent instantiating these objects.

 

In Computer Science we call this behavior “not cool.” So lets try something else!

 

Foolish Way 2: Instantiate Object Outside UDF

    val wkt = new WKTReader()
    val geometryFactory = new GeometryFactory()
    val polyContainsCentroidFilterFail2 = udf((geometry:String, Longitude:Double, Latitude:Double) => {
        val centroid = geometryFactory.createPoint(new Coordinate(Longitude, Latitude))
        wkt.read(geometry).contains(centroid)
})

 

Look! The objects are instantiated outside of the UDF! How wonderful, right? Wrong, friend. This is a one way ticket to non-serializable errors which look like THIS:

 

org.apache.spark.SparkException: Task not serializable.

 

Those instantiated objects just aren’t going to be happy about getting serialized to be sent out to your worker nodes. Looks like we are going to need Vlad to solve this.

 

Vlad’s Super Excellent Solution: Create a New Object and Reference It From the UDF

 

object centroidIntersectService extends serializable{
    @transient lazy val wkt = new WKTReader()
    @transient lazy val geometryFactory = new GeometryFactory()

def testIntersect(geometry:String, longitude:Double, latitude:Double) = {
        val centroid = geometryFactory.createPoint(new Coordinate(longitude, latitude))
        wkt.read(geometry).contains(centroid)   
    }
}
val polyContainsCentroidFilter2 =  udf((geometry:String, longitude:Double, latitude:Double) => {
    centroidIntersectService.testIntersect(geometry,longitude,latitude)
})

 

The only difference is that now we are wrapping the non-serializable bits in an object. Which… just works!!! To understand why, I turned to another engineer: Eloquent Vikas [6]. The reason the previous implementation didn’t work is because the instantiated objects aren’t static: they could still be changed or overridden. That limits Spark’s ability to serialize them and send them out to workers. When we wrap them in an object, the contents of the object becomes static code which can no longer be changed and as a result can be easily serialized and sent out to workers just as simple chunks of code. Adding @transient in front makes us extra double sure. Fantastic!

 

Want more? Has your thirst for serializable errors not been quenched? Perhaps check out this discussion on Stack Overflow or this excellent blog post.

 

Pro-tip before we go: if you are playing around with functions like these in a Spark shell it is pretty easy for Spark to get confused and try to use an old non-serializable function in place of your beautiful and very serializable function. So if you are working in a spark shell and trying to troubleshoot your functions, make sure to use a different function name for each attempt or to restart your shell (better yet, your zeppelin interpreter) between tries. Voilà

 


  1. We definitely don’t call it that. No one would ever call it that  ↩
  2. Not literally… what would that even mean literally? What I’m trying to say is that I think our engineers are pretty great  ↩
  3. Actually, that is a lie, first I turned to Stack Overflow and was really annoyed that I couldn’t find a good answer to my problem and THEN I tried by myself for entirely too long and used some hacky work arounds so that I could get the job done on time and I probably played some ping pong in there somewhere… but when I wanted the problem solved I definitely turned to Vlad  ↩
  4. Which I absolutely did  ↩
  5. Seriously, we 100% don’t call our company this. Don’t ever call PlaceIQ “the Place”. Please  ↩
  6. We have a strong showing of engineer’s with names that start with V. Does your name start with V? Think you have what it takes to join our engineering V crew? Apply for a job, V friend!  ↩