3

I have to match an rdd with its types.

trait Fruit

case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends  Fruit

Now a dstream of type DStream[Fruit] is coming. It is either Apple or Mango.

How to perform operation based on the subclass? Something like the below (which doesn't work):

dStream.foreachRDD{rdd:RDD[Fruit] =>
     rdd match {
       case rdd: RDD[Apple] =>
         //do something

       case rdd: RDD[Mango] =>
         //do something

       case _ =>
         println(rdd.count() + "<<<< not matched anything")
     }
    }
Shaido
  • 25,575
  • 21
  • 68
  • 72
supernatural
  • 919
  • 4
  • 16
  • How are you consuming data I mean what is source which sends data of type Apple or Mango ? – Srinivas Apr 26 '20 at 20:05
  • currently I am using the port (`nc -lk 12345`) – supernatural Apr 26 '20 at 20:06
  • input is given as a json string and is parsed accordingly and i get the `dStream : DStream[Payload]` – supernatural Apr 26 '20 at 20:07
  • Would a solution that pattern matches each row in each RDD work? I.e., for each rows that are Apple => do this, for each Mango row => do this, ignore and filter away other types. – Shaido Apr 27 '20 at 09:38
  • 1
    `trait SentientBeing trait Animal extends SentientBeing case class Dog(name: String) extends Animal case class Person(name: String, age: Int) extends SentientBeing // later in the code ... def printInfo(x: SentientBeing) = x match { case Person(name, age) => // handle the Person case Dog(name) => // handle the Dog }` kind of should work... is it not working? The simple way is @Shaido-ReinstateMonica suggested... you can filter the value of payload type and do processing based on filtered value – Ram Ghadiyaram Apr 27 '20 at 21:47
  • @potterpod: Did you manage to solve this problem? :) – Shaido May 08 '20 at 01:59

1 Answers1

2

Since we have an RDD[Fruit], any row can be either Apple or Mango. When using foreachRDD, each RDD will contain a mix of these (and possible other) types.

To differentiate between the different types, we can use collect[U](f: PartialFunction[T, U]): RDD[U] (this is not to be confused with collect(): Array[T] that returns a list with the elements from the RDD). This function will return an RDD that contains all matching values by applying a function f (in this case, we can use a pattern match here).

Below follows a small illustrative example (adding Orange to the fruits as well).

Setup:

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val dStream: InputDStream[Fruit] = ssc.queueStream(inputData)

inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

This creates a stream of RDD[Fruit] with two separate RDDs.

dStream.foreachRDD{rdd: RDD[Fruit] =>
  val mix = rdd.collect{
    case row: Apple => ("APPLE", row.price) // do any computation on apple rows
    case row: Mango => ("MANGO", row.price) // do any computation on mango rows
    //case _@row => do something with other rows (will be removed by default).
  }
  mix foreach println
}

In the above collect, we change each row slightly (removing the class) and then prints the resulting RDD. Result:

// First RDD
(MANGO,11)
(APPLE,5)
(APPLE,5)

// Second RDD
(MANGO,10)

As can be seen, the pattern match have kept and changed the rows containing Apple and Mango while removing all Orange classes.


Separate RDDs

If wanted, it is also possible to separate the two subclasses into their own RDDs as follows. Any computations can then be performed on these two RDDs.

val apple = rdd.collect{case row: Apple => row}
val mango = rdd.collect{case row: Mango => row}

Complete example code

trait Fruit
case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends  Fruit
case class Orange(price:Int) extends  Fruit

object Test {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.master("local[*]").getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    val inputData: Queue[RDD[Fruit]] = Queue()
    val inputStream: InputDStream[Fruit] = ssc.queueStream(inputData)

    inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
    inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

    inputStream.foreachRDD{rdd:RDD[Fruit] =>
      val mix = rdd.collect{
        case row: Apple => ("APPLE", row.price) // do any computation on apple rows
        case row: Mango => ("MANGO", row.price) // do any computation on mango rows
        //case _@row => do something with other rows (will be removed by default).
      }
      mix foreach println
    }

    ssc.start()
    ssc.awaitTermination()
  }
}
Shaido
  • 25,575
  • 21
  • 68
  • 72
  • 1
    @potterpod seems like this solved your probelm, if not please ask questions /respond..... If you want to close the thread, if you are okay please care to accept [the answer as owner](https://meta.stackexchange.com/a/5235/369717) and [vote-up](https://meta.stackexchange.com/a/173400/369717) – Ram Ghadiyaram May 08 '20 at 02:56
  • Yes its helping, just one thing...on trying to get separate rdd's, Now I get `RDD[DataFrame]` say for Apple, Now i want to write into a db. Now writing one after the other iterating in RDD[Dataframe] doesn't seems efficient. So is there a way to convert RDD[Dataframe] to Dataframe. Convert to a single dataframe and write it as a bulk into the db – supernatural May 11 '20 at 17:14
  • 1
    convert RDD[Apple] in to RDD[Row]((not RDD[DataFrame])... [see here](https://stackoverflow.com/questions/37004352/how-to-convert-a-case-class-based-rdd-into-a-dataframe) RDD[Row] is nothing but a dataframe. you can insert dataframe to database using write methods. pls dont forget to accept the answer to close thread. its brilliant explanation given by shadow I feel. – Ram Ghadiyaram May 11 '20 at 18:44