0

Suppose we have a DataFrame with a column of map type. What is the most straightforward way to convert it to a struct (or, equivalently, define a new column with the same keys and values but as a struct type)? See the following spark-shell (2.4.5) session, for an insanely inefficient way of going about it:

val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")

val jsonStr = df.withColumn("jsonized", to_json($"mapColumn")).select("jsonized").collect()(0)(0).asInstanceOf[String]

spark.read.json(Seq(jsonStr).toDS()).show()
+---+---+
|bar|foo|
+---+---+
|  2|  1|
+---+---+

Now, obviously collect() is very inefficient, and this is generally an awful way to do things in Spark. But what is the preferred way to accomplish this conversion? named_struct and struct both take a sequence of parameter values to construct the results, but I can't find any way to "unwrap" the map key/values to pass them to these functions.

Jeff Evans
  • 1,207
  • 1
  • 15
  • 30

3 Answers3

2

I would use explode function:

+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+

df.select(explode('mapColumn)).select(struct('*).as("struct"))

output:

+--------+
|  struct|
+--------+
|[foo, 1]|
|[bar, 2]|
+--------+

root
 |-- struct: struct (nullable = false)
 |    |-- key: string (nullable = false)
 |    |-- value: integer (nullable = false)
chlebek
  • 2,303
  • 1
  • 7
  • 20
  • This does exactly what I want, but the weird, seemingly unbalanced single quotes are throwing me off... – Jeff Evans Jun 05 '20 at 16:19
  • 1
    in case it's not obvious, in a real life scenario with multiple columns you would use `.select('*', struct('mapCol'))` – Topde Jun 06 '20 at 10:09
  • 1
    I finally learned that my confusion was due to the Symbol syntax. I.e. this: https://stackoverflow.com/a/918613/375670 – Jeff Evans Jun 19 '20 at 17:19
1

I see @chlebek answer but in case it should be kept in one row you can use an UDF

scala> val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")
df: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>]

scala> df.show
+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+

scala> case class KeyValue(key: String, value: String)
defined class KeyValue

scala> val toArrayOfStructs = udf((value: Map[String, String]) => value.map {
     |   case (k, v) => KeyValue(k, v)
     | }.toArray )
toArrayOfStructs: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,true)),true),Some(List(MapType(StringType,StringType,true))))

scala> df.withColumn("alfa", toArrayOfStructs(col("mapColumn")))
res4: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>, alfa: array<struct<key:string,value:string>>]

scala> res4.show
+--------------------+--------------------+
|           mapColumn|                alfa|
+--------------------+--------------------+
|[foo -> 1, bar -> 2]|[[foo, 1], [bar, 2]]|
+--------------------+--------------------+


scala> res4.printSchema
root
 |-- mapColumn: map (nullable = false)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- alfa: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
0

Define a case class

case class Bean56(foo: Int, bar: Int)
//Define a bean Encoder
val personEncoder = Encoders.bean[Bean56](classOf[Bean56])

    val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")

//Map the output to required bean
    val Bean56s = df.map(row => {
      val map = row.getMap[String, Int](0)
      Bean56(map.getOrElse("foo", -1), map.getOrElse("bar", -1))
    })(personEncoder)  // Supply implicit Encoder of the bean
    Bean56s.foreach(println(_)) // Print the bean
QuickSilver
  • 3,778
  • 2
  • 11
  • 28