-2

I have a static dataframe, how to write it to the console instead of using df.show()

val sparkConfig = new SparkConf().setAppName("streaming-vertica").setMaster("local[2]")
val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfig).getOrCreate()
val sc = sparkSession.sparkContext

val rows = sc.parallelize(Array(
  Row(1,"hello", true),
  Row(2,"goodbye", false)
))

val schema = StructType(Array(
  StructField("id",IntegerType, false),
  StructField("sings",StringType,true),
  StructField("still_here",BooleanType,true)
))

val df = sparkSession.createDataFrame(rows, schema) 

df.write
  .format("console")
  .mode("append")

This is writing nothing into console:

 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/04/27 00:30:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Process finished with exit code 0

On using save :

   df.write
      .format("console")
      .mode("append")
      .save()

It gives :

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 20/04/27 00:45:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread "main" java.lang.RuntimeException: org.apache.spark.sql.execution.streaming.ConsoleSinkProvider does not allow create table as select. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:473) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at rep.StaticDFWrite$.main(StaticDFWrite.scala:35) at rep.StaticDFWrite.main(StaticDFWrite.scala)

Spark version = 2.2.1
scala version = 2.11.12

supernatural
  • 919
  • 4
  • 16
  • you have to call an action, Spark is lazy and you did nothing but setting up the writer. The action would be `save` – UninformedUser Apr 26 '20 at 19:15
  • Why is it giving an exception on using `save` @UninformedUser – supernatural Apr 26 '20 at 19:18
  • Why are you saving to console? If you're only printing to console to debug, use `show()` – Danny Varod Apr 26 '20 at 19:35
  • i have a situation in which it writes to a database, and there is something i need to check before verifying so I have brought down this piece of code as an example and see what happens in each line , that is the reason i am writing it to the console to make an exact replica and analyse the situation@DannyVarod – supernatural Apr 26 '20 at 19:38

1 Answers1

0

You have to call save on DataFrameWriter object.

without save method it will just create DataFrameWriter object & terminate your session.

Check below code, I have checked in spark-shell.

Please note this code is working on spark version 2.4.0 but not working on 2.2.0

console format will not work with write in spark 2.2.0 - https://issues.apache.org/jira/browse/SPARK-20599

scala> df.write.format("console").mode("append")
res5: org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.DataFrameWriter@148a3112

scala> df.write.format("console").mode("append").save()
+--------+---+
|    name|age|
+--------+---+
|srinivas| 20|
+--------+---+


Srinivas
  • 8,195
  • 2
  • 11
  • 25