5

I am trying to test below program to take the checkpoint and read if from checkpoint location if in case application fails due to any reason like resource unavailability. When I kill the job and retrigger it again, execution restarts from beginning. Not sure what else is required to achieve this. Thanks !!

Below is the code:

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object withCheckpoint {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    //val conf = new SparkConf().setAppName("Without Checkpoint")
    val conf = new SparkConf().setAppName("With Checkpoint")
    val sc = new SparkContext(conf)


    val checkpointDirectory = "/tmp"

    sc.setCheckpointDir(checkpointDirectory)   // set checkpoint directory

    val spark = SparkSession.builder.appName("Without Checkpoint").getOrCreate()



    /************************************************************************************************************************************************/
    /*                                                Reading source data begins here                                                               */
    /************************************************************************************************************************************************/


    val readCtryDemoFile = spark.read.option("header", "true").csv("/tmp/Ctry_Demo.csv")



    val readCtryRefFile = spark.read.option("header","true").csv("/tmp/ref_ctry.csv")



    val readCtryCntntFile = spark.read.option("header","true").csv("/tmp/ctry_to_continent.csv")


    /************************************************************************************************************************************************/
    /*                                                Reading source data Completes                                                                 */
    /************************************************************************************************************************************************/


    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/


    /*********************************************************************************/
    /* Join above created dataframes to pull respective columns                      */
    /*********************************************************************************/


    val jnCtryDemoCtryref = readCtryDemoFile.join(readCtryRefFile,Seq("NUM_CTRY_CD"))


    val jnCtryCntnt = jnCtryDemoCtryref.join(readCtryCntntFile,Seq("Alpha_2_CTRY_CD"))





    /*********************************************************************************/
    /* Checkpointing the above created Dataframe to the checkpoint Directory         */
    /*********************************************************************************/

    val jnCtryCntntchkpt = jnCtryCntnt.checkpoint()
    jnCtryCntntchkpt.collect()

    /*********************************************************************************/
    /* Creating multiple outputs based on different aggregation keys                 */
    /*********************************************************************************/

    val aggCntnNm = jnCtryCntntchkpt.groupBy("CONTINENT_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("CONTINENT_NM")
    aggCntnNm.show()


    val aggCtryNm = jnCtryCntntchkpt.groupBy("Ctry_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("Ctry_NM")
    aggCtryNm.show()


    val aggCtryCd = jnCtryCntntchkpt.groupBy("NUM_CTRY_CD").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("NUM_CTRY_CD")
    aggCtryCd.show()

    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/

  }
}
NRC
  • 73
  • 1
  • 5
  • I would follow this links to understand checkpointing and persisting and when to use one or another: https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk and this one may be useful too : https://stackoverflow.com/questions/36632356/what-does-checkpointing-do-on-apache-spark – Chema Jun 05 '20 at 11:06
  • What I am unable to understand based on my limited knowledge in this area is, what do I need to additionally add to my spark application so that it understands to read the Checkpoint directory in case of failure. Also am I suppose to clean up the directory by end of the application since I can see it keeps writting data at the directory location with different names. Thanks !! – NRC Jun 08 '20 at 01:13
  • In the case of spark streaming it is mandatory to create a checkpointdir, for both, in case of failure or in case to calculate some intermediate results and it reads automatically, you don't have to do anything more. In case of rdd or Dataframe could be better to persists because it mainteins lineage to recover in case of failure and avoiding recomputation. – Chema Jun 08 '20 at 07:52
  • Below is the program which I am trying to run top test if checkpoint is working as expected. After checkpoint was taken, I killed the job and retriggered it again. But the execution didn't start from the Checkpoint location rather restarted from scratch. Not sure what I am exactly missing. Below is the program for your reference just in case this helps. Any help is muct appreciated. Thanks !! – NRC Jun 15 '20 at 02:24
  • I wrote an answer. If it was helpful, please accept the answer and upvote, I really would appreciate it. – Chema Jun 15 '20 at 14:48

1 Answers1

8

I hope I can clean some of your doubts explaining checkpointing and giving you an example of how to recover a dataset from a checkpoint directory.

Checkpointing is mainly used in iterative algorithms and Streaming processess.

On batch processing we are used to having fault tolerance(caching or persisting). This means, in case a node crashed, the job doesn't loose its state and the lost tasks are rescheduled on other workers. Intermediate results are written to persistent storage(that has to be fault tolerant as well like HDFS, or Cloud Object Storage)

Maintaining RDD lineage(caching or persisting) provides resilience but can also cause problems when the lineage gets very long - For example: iterative algorithms, streaming - Recovery can be very expensive - Potencial stack overflow

Checkpointing saves the data to HDFS - Provides fault-tolerant storage across nodes - Lineage is not saved - Must be checkpointed before any actions on the RDD

Dataset Checkpointing

Is a feature of Spark SQL to truncate a logical query plan that could specifically be useful for highly iterative data algorithms (e.g. Spark MLlib that uses Spark SQL’s Dataset API for data manipulation).

Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD . That has been successfully used in Spark Streaming - the now-obsolete Spark module for stream processing based on RDD API. Checkpointing truncates the lineage of a RDD to be checkpointed. That has been successfully used in Spark MLlib in iterative machine learning algorithms like ALS. Dataset checkpointing in Spark SQL uses checkpointing to truncate the lineage of the underlying RDD of a Dataset being checkpointed.

Using Dataset checkpointing requires that you specify the checkpoint directory. The directory stores the checkpoint files for RDDs to be checkpointed. Use SparkContext.setCheckpointDir to set the path to a checkpoint directory. Checkpointing can be local or reliable which defines how reliable the checkpoint directory is. Local checkpointing uses executor storage to write checkpoint files to and due to the executor lifecycle is considered unreliable. Reliable checkpointing uses a reliable data storage like Hadoop HDFS.

Writing a checkpoint directory

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


/**
  * Checkpointing
  *     - Maintaining RDD lineage provides resilience but can also cause problems when the lineage gets very long
  *         - For example: iterative algorithms, streaming
  *     - Recovery can be very expensive
  *     - Potencial stack overflow
  *     - Checkpointing saves the data to HDFS
  *         - Provides fault-tolerant storage across nodes
  *         - Lineage is not saved
  *         - Must be checkpointed before any actions on the RDD
  */
object WriteCheckPoint {
  val spark = SparkSession
    .builder()
    .appName("WriteCheckPoint")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","WriteCheckPoint") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Remember to set the checkpoint directory
  spark.sparkContext.setCheckpointDir("hdfs://localhost/user/cloudera/checkpoint")

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)
    // Set org.apache.spark.rdd.ReliableRDDCheckpointData logger to INFO
    // to see what happens while an RDD is checkpointed
    // Let's use log4j API so, you should add import org.apache.log4j.{Level, Logger}
    Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)

    try {
      val nums = spark.range(5).withColumn("random", rand()).filter("random > 0.5")
      // Must be checkpointed before any actions on the RDD
      nums.checkpoint
      // Save the schema as it is going to use to reconstruct nums dataset from a RDD
      val schema = nums.schema
      schema.printTreeString()

      nums.show()

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

output

20/06/15 16:42:50 INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4, new parent is RDD 5
root
 |-- id: long (nullable = false)
 |-- random: double (nullable = false)

+---+------------------+
| id|            random|
+---+------------------+
|  2|0.9550560942227814|
+---+------------------+

You will have to define a couple of helper objects that are protected in package org.apache.spark and org.apache.spark.sql

package org.apache.spark

/**
  * SparkContext.checkpointFile is a `protected[spark]` method
  * define a helper object to "escape" the package lock-in
  */
object my {
  import scala.reflect.ClassTag
  import org.apache.spark.rdd.RDD
  def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
    sc.checkpointFile[T](path)
  }
}
package org.apache.spark.sql

object my2 {
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SparkSession}
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.types.StructType
  def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
    spark.internalCreateDataFrame(catalystRows, schema)
  }
}

Reading a checkpoint directory

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}


/**
  * Recovering RDD From Checkpoint Files
  * — SparkContext.checkpointFile Method
  *   SparkContext.checkpointFile(directory: String)
  *   checkpointFile reads (recovers) a RDD from a checkpoint directory.
  * Note SparkContext.checkpointFile is a protected[spark] method
  * so the code to access it has to be in org.apache.spark package.
  * Internally, checkpointFile creates a ReliableCheckpointRDD in a scope.
  */
object ReadingCheckPoint {
  val spark = SparkSession
    .builder()
    .appName("ReadingCheckPoint")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ReadingCheckPoint") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Make sure to use the same checkpoint directory
  val pathCheckpoint = "hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4"

  def main(args: Array[String]): Unit = {

    try {

      Logger.getRootLogger.setLevel(Level.ERROR)

      val schema = new StructType()
        .add("field1",IntegerType)
        .add("field2",DoubleType)

      import org.apache.spark.my
      import org.apache.spark.sql.catalyst.InternalRow
      val numsRddRecovered = my.recover[InternalRow](spark.sparkContext, pathCheckpoint) //org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]
      numsRddRecovered.foreach(x => println(x.toString))

      // We have to convert RDD[InternalRow] to DataFrame
      import org.apache.spark.sql.my2
      val numsRecovered = my2.createDataFrame(spark, numsRddRecovered, schema)
      numsRecovered.show()


      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

output

[0,2,3fee8fd1cc5108ef]
+------+------------------+
|field1|            field2|
+------+------------------+
|     2|0.9550560942227814|
+------+------------------+

You can follow this link to the Spark Documentation: Checkpointing

Chema
  • 2,599
  • 2
  • 12
  • 22
  • thank you for sharing detailed explanation on this. Can't my single object be enough to checkpoint a dataframe and further read it from there ? Do we need to have 2 different objects created for writing and reading a DF? I am modifying my question to add the program I have written for testing checkpoint. Please see if you can help me on that. Thanks !! – NRC Jun 15 '20 at 22:19
  • 1
    First of all, I would recommend you to create a bounty question, Why? Because you will have more attention from the community and more people will want to answer your question. Second, I see that this is a batch process, then my question is, what kind of crash will you have? or what kind of exceptions will you have? recoverable exceptions or unrecoverable exceptions? Because if these are of the second kind I think you couldn't. But if these are recoverable exceptions I don't know yet. – Chema Jun 16 '20 at 10:49