I am looking for a way to expand a tuple by adding another field to it. I have already figured out multiple ways to do this with shapeless.
I've run into trouble when I tried to create a generic method to allow adding another field to a Product (case class or tuple) within a Spark Datasets.
When I use only shapeless (productElements and tupled methods) I am unable to come up with an encoder for the expanded tuple.
For example, given my Dataset[(String, Int)] I want to turn this into Dataset[(String,Int,String)] so need to find a way to merge Encoder[(String,Int)] with Encoder[String] into Encoder[(String,Int,String)]. Another issue in my way is that because the method is generic, the compiler simply uses R to denote my tuple (String,Int) so the encoder I create becomes enc: org.apache.spark.sql.Encoder[(R,String)] which may or may not be the problem.
Here is the code I'm trying in REPL
Try #1:
import scala.reflect.runtime.universe.TypeTag
import shapeless.ops.hlist.Tupler
import shapeless.<:!<
import shapeless.ops.tuple.Prepend
import shapeless.syntax.std.product.productOps
def writeToPartitionDynamicModeWorks[A <: Product:TypeTag, L <: shapeless.HList, R<: Product]
(ds: Dataset[A], dateField: String)
(implicit ev:EnumerableProduct[A],
gen:shapeless.Generic[A],
gen2:shapeless.Generic.Aux[A,R], //R is a tuple (String,Int)
tupler: Tupler[L],
enc: org.apache.spark.sql.Encoder[(R,String)])={
import ds.sparkSession.implicits._
val outputDs = ds.map { case nk => (nk.productElements +: Tuple1(dateField).productElements ).tupled }
}
ERROR:
scala.ScalaReflectionException: is not a term
Walking down the stack trace I see it starting at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
which is this code in SQLImplicits.scala
trait LowPrioritySQLImplicits {
/** @since 1.6.0 */
implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
}
So basically sth is messed up after using Encoder mismatching the actual tuple being encoded I think.
But if I remove the implicit enc: org.apache.spark.sql.Encoder[(R,String)] I get a missing encoder error:
" Unable to find encoder for type (R, String). An implicit Encoder[(R, String)] is needed to store (R, String) instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. "
Googling "scala.ScalaReflectionException: is not a term" got me nowhere.
Try #2:
If I use this kryo serializer piece of code I get a new error but at runtime. Now I had to put my method into a class and extend Serializable.
object SparkEncodingImplicits {
import scala.reflect.ClassTag
import org.apache.spark.sql._
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
}
import SparkEncodingImplicits._
class Test {
def writeToPartitionDynamicModeWorks[A <: Product:TypeTag, L <: shapeless.HList, R<: Product]
(ds: Dataset[A], dateField: String)
(implicit ev:EnumerableProduct[A],
gen:shapeless.Generic[A],
gen2:shapeless.Generic.Aux[A,R], //R is a tuple (String,Int)
tupler: Tupler[L],
enc: org.apache.spark.sql.Encoder[(R,String)])={
import ds.sparkSession.implicits._
val outputDs = ds.map { case nk => (nk.productElements +: Tuple1(dateField).productElements ).tupled }
}
}
ERROR:
org.apache.spark.SparkException: Task not serializable ... Caused by: java.io.NotSerializableException: SparkEncodingImplicits$ After a very long serialization stack trace I see this but I cannot interpret it:
- object (class $line136.$read$$iw$$iw$$iw$$iw$Test$$anonfun$1, )
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)
No idea what is still not serializable! Let's try sth else.
Try #3:
I made my own tuple expander and tried to use that:
object ProductExpanderOps {
implicit class ProductExpander[HL](nodes: HL) {
def expandProductBy[T, OUT](newNode: T)
(implicit ev: T <:!< Product, prepend: Prepend.Aux[HL, Tuple1[T], OUT]):OUT = {
prepend(nodes, Tuple1(newNode))
}
def expandProductBy[T <: Product, OUT](newNode: T)
(implicit prepend: Prepend.Aux[HL, T, OUT]):OUT = {
prepend(nodes, newNode)
}
}
}
def writeToPartitionDynamicModeWorks[A <: Product:TypeTag, L <: shapeless.HList, R<: Product]
(ds: Dataset[A], dateField: String)
(implicit ev:EnumerableProduct[A],
gen:shapeless.Generic[A],
gen2:shapeless.Generic.Aux[A,R], //R is a tuple (String,Int)
enc: org.apache.spark.sql.Encoder[(R,String)])={
import ds.sparkSession.implicits._
import ProductExpanderOps._
val outputDs = ds.map { case nk => nk.expandProductBy(dateField) }
}
ERROR error: could not find implicit value for parameter prepend: shapeless.ops.tuple.Prepend.Aux[A,(String,),OUT]
Now at I'm my wits' end.
Some Interesting SO posts scala generic encoder for spark case class Encode an ADT / sealed trait hierarchy into Spark DataSet column Dyamically cast spark dataframe to dataset of tuple(String,_<:Product) Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects Add ADT column in Spark dataset?