1

I would love if someone of you guys can guide me to convert a scala (or java) Resultset to spark Dataframe.

I cannot use this notation:

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/")
  .option("dbtable", "pg_partner")
  .option("user", "XXX")
  .option("password", "XXX")
  .load()

So before referring me to this similar question, please take it into account.

The reason why I cannot use that notation is that I need to use a jdbc configuration which is not present in current version of spark that I am using (2.2.0), because I want to use a "queryTimeout" option which has been recently added to the spark version 2.4, so I need to use it in the ResultSet.

Any help will be appreciated.

Thank you in advance!

James
  • 2,793
  • 2
  • 11
  • 22

2 Answers2

3

Give this a try

(haven't tried but should work with slight modification)

import java.sql.ResultSet
import org.apache.spark.sql.DataFrame

// assuming ResultSet comprises rows of (String, Int)
def resultSetToDataFrame(resultSet: ResultSet): DataFrame = {
  val resultSetAsList: List[(String, Int)] = new Iterator[(String, Int)] {
    override def hasNext: Boolean = resultSet.next()

    override def next(): (String, Int) = {
      // can also use column-label instead of column-index
      (resultSet.getString(0), resultSet.getInt(1))
    }
  }.toStream.toList

  import org.apache.spark.implicits._
  val listAsDataFrame: DataFrame = resultSetAsList.toDF("column_name_1", "column_name_2")

  listAsDataFrame
}

References:

y2k-shubham
  • 8,431
  • 10
  • 45
  • 111
3

A working example against public source mySQL

import java.util.Properties
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.implicits.

val url = "jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam"
val username = "rfamro"
val password = ""
val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url, username, password), "select rfam_id, noise_cutoff from family limit ?, ?", 1, 100, 10,                  
                    r => r.getString("rfam_id") + ", " + r.getString("noise_cutoff"))
val DF = myRDD.toDF
DF.show

returns:

+-------------------+
|              value|
+-------------------+
|    5_8S_rRNA, 41.9|
|           U1, 39.9|
|           U2, 45.9|
|         tRNA, 28.9|
|        Vault, 33.9|
|          U12, 52.9|
....
....
thebluephantom
  • 14,410
  • 8
  • 36
  • 67
  • I am on it... I'm gonna try both solutions to select the best answer for my case, but thanks again dude for your support – James Aug 08 '18 at 10:08
  • Fine, but mine actually works and they may well be complimentary. I think I actually answered. I only mention it as I find a number of people don't get the conduct code here – thebluephantom Aug 08 '18 at 10:13