-1

I am looking for a way to expand a tuple by adding another field to it. I have already figured out multiple ways to do this with shapeless.

I've run into trouble when I tried to create a generic method to allow adding another field to a Product (case class or tuple) within a Spark Datasets.

When I use only shapeless (productElements and tupled methods) I am unable to come up with an encoder for the expanded tuple. For example, given my Dataset[(String, Int)] I want to turn this into Dataset[(String,Int,String)] so need to find a way to merge Encoder[(String,Int)] with Encoder[String] into Encoder[(String,Int,String)]. Another issue in my way is that because the method is generic, the compiler simply uses R to denote my tuple (String,Int) so the encoder I create becomes enc: org.apache.spark.sql.Encoder[(R,String)] which may or may not be the problem.

Here is the code I'm trying in REPL

Try #1:

import scala.reflect.runtime.universe.TypeTag
import shapeless.ops.hlist.Tupler
import shapeless.<:!<
import shapeless.ops.tuple.Prepend
import shapeless.syntax.std.product.productOps

def writeToPartitionDynamicModeWorks[A <: Product:TypeTag, L <: shapeless.HList, R<: Product]
  (ds: Dataset[A], dateField: String)
  (implicit ev:EnumerableProduct[A],
   gen:shapeless.Generic[A],
   gen2:shapeless.Generic.Aux[A,R], //R is a tuple (String,Int)
   tupler: Tupler[L],
   enc: org.apache.spark.sql.Encoder[(R,String)])={
     import ds.sparkSession.implicits._
     val outputDs = ds.map { case nk => (nk.productElements +: Tuple1(dateField).productElements ).tupled }
}

ERROR:
scala.ScalaReflectionException: is not a term Walking down the stack trace I see it starting at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)

which is this code in SQLImplicits.scala

trait LowPrioritySQLImplicits {
  /** @since 1.6.0 */
  implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
}

So basically sth is messed up after using Encoder mismatching the actual tuple being encoded I think. But if I remove the implicit enc: org.apache.spark.sql.Encoder[(R,String)] I get a missing encoder error:

" Unable to find encoder for type (R, String). An implicit Encoder[(R, String)] is needed to store (R, String) instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. "

Googling "scala.ScalaReflectionException: is not a term" got me nowhere.

Try #2:

If I use this kryo serializer piece of code I get a new error but at runtime. Now I had to put my method into a class and extend Serializable.

  object SparkEncodingImplicits {
    import scala.reflect.ClassTag
    import org.apache.spark.sql._

    implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

    implicit def tuple2[A1, A2](
                                 implicit e1: Encoder[A1],
                                 e2: Encoder[A2]
                               ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
  }
import SparkEncodingImplicits._

class Test {
    def writeToPartitionDynamicModeWorks[A <: Product:TypeTag, L <: shapeless.HList, R<: Product]
      (ds: Dataset[A], dateField: String)
      (implicit ev:EnumerableProduct[A],
       gen:shapeless.Generic[A],
       gen2:shapeless.Generic.Aux[A,R], //R is a tuple (String,Int)
       tupler: Tupler[L],
       enc: org.apache.spark.sql.Encoder[(R,String)])={
         import ds.sparkSession.implicits._
         val outputDs = ds.map { case nk => (nk.productElements +: Tuple1(dateField).productElements ).tupled }
    }
}

ERROR:

org.apache.spark.SparkException: Task not serializable ... Caused by: java.io.NotSerializableException: SparkEncodingImplicits$ After a very long serialization stack trace I see this but I cannot interpret it:

  • object (class $line136.$read$$iw$$iw$$iw$$iw$Test$$anonfun$1, )
  • element of array (index: 1)
  • array (class [Ljava.lang.Object;, size 2)
  • field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)

No idea what is still not serializable! Let's try sth else.

Try #3:

I made my own tuple expander and tried to use that:

object ProductExpanderOps {
  implicit class ProductExpander[HL](nodes: HL) {
    def expandProductBy[T, OUT](newNode: T)
                               (implicit ev: T <:!< Product, prepend: Prepend.Aux[HL, Tuple1[T], OUT]):OUT = {
      prepend(nodes, Tuple1(newNode))
    }

    def expandProductBy[T <: Product, OUT](newNode: T)
                                          (implicit prepend: Prepend.Aux[HL, T, OUT]):OUT = {
      prepend(nodes, newNode)
    }
  }
}

def writeToPartitionDynamicModeWorks[A <: Product:TypeTag, L <: shapeless.HList, R<: Product]
  (ds: Dataset[A], dateField: String)
  (implicit ev:EnumerableProduct[A],
   gen:shapeless.Generic[A],
   gen2:shapeless.Generic.Aux[A,R], //R is a tuple (String,Int)
   enc: org.apache.spark.sql.Encoder[(R,String)])={
     import ds.sparkSession.implicits._
     import ProductExpanderOps._
     val outputDs = ds.map { case nk => nk.expandProductBy(dateField) }
}

ERROR error: could not find implicit value for parameter prepend: shapeless.ops.tuple.Prepend.Aux[A,(String,),OUT]

Now at I'm my wits' end.

Some Interesting SO posts scala generic encoder for spark case class Encode an ADT / sealed trait hierarchy into Spark DataSet column Dyamically cast spark dataframe to dataset of tuple(String,_<:Product) Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects Add ADT column in Spark dataset?

Adrian
  • 5,501
  • 7
  • 49
  • 80

0 Answers0