156

I am using https://github.com/databricks/spark-csv , I am trying to write a single CSV, but not able to, it is making a folder.

Need a Scala function which will take parameter like path and file name and write that CSV file.

zero323
  • 305,283
  • 89
  • 921
  • 912
user1735076
  • 2,995
  • 7
  • 18
  • 16
  • https://bigdata-etl.com/apache-spark-save-dataframe-as-a-single-file-hdfs/ - Please find answer how to store DataFrame as single file! – Paweł Cieśla Apr 24 '22 at 09:21

15 Answers15

215

It is creating a folder with multiple files, because each partition is saved individually. If you need a single output file (still in a folder) you can repartition (preferred if upstream data is large, but requires a shuffle):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

or coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

data frame before saving:

All data will be written to mydata.csv/part-00000. Before you use this option be sure you understand what is going on and what is the cost of transferring all data to a single worker. If you use distributed file system with replication, data will be transfered multiple times - first fetched to a single worker and subsequently distributed over storage nodes.

Alternatively you can leave your code as it is and use general purpose tools like cat or HDFS getmerge to simply merge all the parts afterwards.

zero323
  • 305,283
  • 89
  • 921
  • 912
  • 7
    you can use coalesce also : df.coalesce(1).write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv") – ravi Oct 07 '15 at 11:31
  • spark 1.6 throws an error when we set `.coalesce(1)` it says some FileNotFoundException on _temporary directory. It is still a bug in spark : https://issues.apache.org/jira/browse/SPARK-2984 – Harsha Jul 26 '16 at 17:49
  • @Harsha Unlikely. Rather a simple result of `coalesce(1)` being highly expensive and usually not practical. – zero323 Jul 27 '16 at 15:18
  • Agreed @zero323, but if you have a special requirement to consolidate into one file, it should still be possible given that you have ample resources and time. – Harsha Jul 29 '16 at 02:56
  • 2
    @Harsha I don't say there isn't. If you correctly tune GC it should work just fine but it is simply a waste of time and most likely will hurt overall performance. So personally I don't see any reason to bother especially since it is trivially simple to merge files outside Spark without worrying about memory usage at all. – zero323 Jul 29 '16 at 09:41
  • @zero323 I am new to data bricks and trying to save my result in CSV file and its working well. Can we rename this CSV file while saving it like now filename starts like this "part_0000" and I want to save it as "sample.csv". What should I do for that? – Shringa Bais Aug 01 '18 at 18:38
  • Well, I got my solution by simply using the move command dbutils.fs.mv(source, destination) – Shringa Bais Aug 01 '18 at 20:58
  • Don't know if this is still alive but can someone tell me why is this metod returning an exception caught: java.lang.UnsupportedOperationException: CSV data source does not support null data type. ? – Haha Sep 09 '19 at 15:31
  • @zero323 , not sure if this thread is still alive but while using the repartition as you mentioned it created a folder and placed a single inside it. But is it possible to omit the folder creation and directly create the csv file with the name of our choice? – Akhilesh Pothuri Oct 13 '20 at 11:41
  • `coalesce(1)` is more effective, than `repartition(1)` https://stackoverflow.com/a/40983145/1006855 – DenisOgr Jan 21 '22 at 17:48
41

If you are running Spark with HDFS, I've been solving the problem by writing csv files normally and leveraging HDFS to do the merging. I'm doing that in Spark (1.6) directly:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Can't remember where I learned this trick, but it might work for you.

zero323
  • 305,283
  • 89
  • 921
  • 912
Minkymorgan
  • 489
  • 4
  • 9
  • I've not tried it - and suspect it may not be straight forward. – Minkymorgan Aug 13 '17 at 18:27
  • 1
    Thanks. I've [added an answer](https://stackoverflow.com/a/45360343/1048186) that works on Databricks – Josiah Yoder Aug 14 '17 at 16:49
  • @Minkymorgan i have similar problem but not able to do it correctly ..Can you please look at this question https://stackoverflow.com/questions/46812388/how-to-merge-all-part-files-in-a-folder-created-by-spark-data-frame-and-rename-a – SUDARSHAN Oct 23 '17 at 04:53
  • 4
    @SUDARSHAN My function above works with uncompressed data. In your example I think you are using gzip compression as you write files - and then after - trying to merge these together which fails. That isn't going to work, as you can't merge gzip files together. Gzip is not a Splittable Compression algorithm, so certainly not "mergable". You might test "snappy" or "bz2" compression - but gut feel is this will fail too on merge. Probably best best is to remove compression, merge raw files, then compress using a splittable codec. – Minkymorgan Oct 23 '17 at 17:04
  • and what if I want to preserve header? it duplicates for each file part – Normal May 21 '18 at 04:46
  • I have seen in more recent spark versions that the databricks utilities can address this issue. Parquet is a great option if available for you. – jatal Jun 19 '18 at 15:03
  • Don't know if this is still alive but can someone tell me why is this metod returning an exception caught: java.lang.UnsupportedOperationException: CSV data source does not support null data type. ? – Haha Sep 09 '19 at 15:31
  • @Minkymorgan I am relatively new to Databricks , and have tried your soln to merge csv , but get invalid syntax error starting at the g of "val hadoopConfig" . Why would this be please ? – Richard H Jan 21 '20 at 10:08
  • @Minkymorgan I am using pyspark is this the problem?, I need to import the hadoop libraries into the pyspark environment. My understanding may not quite be correct in this... – Richard H Jan 21 '20 at 11:04
39

I might be a little late to the game here, but using coalesce(1) or repartition(1) may work for small data-sets, but large data-sets would all be thrown into one partition on one node. This is likely to throw OOM errors, or at best, to process slowly.

I would highly suggest that you use the FileUtil.copyMerge() function from the Hadoop API. This will merge the outputs into a single file.

EDIT - This effectively brings the data to the driver rather than an executor node. Coalesce() would be fine if a single executor has more RAM for use than the driver.

EDIT 2: copyMerge() is being removed in Hadoop 3.0. See the following stack overflow article for more information on how to work with the newest version: How to do CopyMerge in Hadoop 3.0?

Krzysztof Atłasik
  • 20,861
  • 6
  • 47
  • 70
etspaceman
  • 520
  • 5
  • 8
  • 1
    Any thoughts on how to get a csv with a header row this way? Wouldn't want to have the file produce a header, since that would intersperse headers throughout the file, one for each partition. – nojo Mar 23 '17 at 22:32
  • There's an option that I've used in the past documented here: http://www.markhneedham.com/blog/2014/11/30/spark-write-to-csv-file-with-header-using-saveasfile/ – etspaceman Sep 03 '17 at 20:17
  • @etspaceman Cool. I still don't really have a good way to do this, unfortunately, as I need to be able to do this in Java (or Spark, but in a way that doesn't consume lots of memory and can work with big files). I still can't believe they removed this API call... this is a very common usage even if not exactly used by other applications in the Hadoop ecosystem. – woot Nov 02 '17 at 12:35
22

If you are using Databricks and can fit all the data into RAM on one worker (and thus can use .coalesce(1)), you can use dbfs to find and move the resulting CSV file:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

If your file does not fit into RAM on the worker, you may want to consider chaotic3quilibrium's suggestion to use FileUtils.copyMerge(). I have not done this, and don't yet know if is possible or not, e.g., on S3.

This answer is built on previous answers to this question as well as my own tests of the provided code snippet. I originally posted it to Databricks and am republishing it here.

The best documentation for dbfs's rm's recursive option I have found is on a Databricks forum.

zero323
  • 305,283
  • 89
  • 921
  • 912
Josiah Yoder
  • 2,896
  • 4
  • 33
  • 52
14

spark's df.write() API will create multiple part files inside given path ... to force spark write only a single part file use df.coalesce(1).write.csv(...) instead of df.repartition(1).write.csv(...) as coalesce is a narrow transformation whereas repartition is a wide transformation see Spark - repartition() vs coalesce()

df.coalesce(1).write.csv(filepath,header=True) 

will create folder in given filepath with one part-0001-...-c000.csv file use

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

to have a user friendly filename

pprasad009
  • 438
  • 6
  • 7
  • 2
    alternatively if the dataframe is not too big (~GBs or can fit in driver memory) you can also use `df.toPandas().to_csv(path)` this will write single csv with your preferred filename – pprasad009 Dec 10 '19 at 18:38
  • 3
    Ugh, so frustrating how this can only be done by converting to pandas. How hard is it to just write a file without some UUID in it? – ijoseph Apr 23 '20 at 22:16
  • how do I overwrite it ? it works for write but fails for overwrite – akash sharma Sep 13 '20 at 09:54
9

This answer expands on the accepted answer, gives more context, and provides code snippets you can run in the Spark Shell on your machine.

More context on accepted answer

The accepted answer might give you the impression the sample code outputs a single mydata.csv file and that's not the case. Let's demonstrate:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Here's what's outputted:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

N.B. mydata.csv is a folder in the accepted answer - it's not a file!

How to output a single file with a specific name

We can use spark-daria to write out a single mydata.csv file.

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

This'll output the file as follows:

Documents/
  better/
    mydata.csv

S3 paths

You'll need to pass s3a paths to DariaWriters.writeSingleFile to use this method in S3:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

See here for more info.

Avoiding copyMerge

copyMerge was removed from Hadoop 3. The DariaWriters.writeSingleFile implementation uses fs.rename, as described here. Spark 3 still used Hadoop 2, so copyMerge implementations will work in 2020. I'm not sure when Spark will upgrade to Hadoop 3, but better to avoid any copyMerge approach that'll cause your code to break when Spark upgrades Hadoop.

Source code

Look for the DariaWriters object in the spark-daria source code if you'd like to inspect the implementation.

PySpark implementation

It's easier to write out a single file with PySpark because you can convert the DataFrame to a Pandas DataFrame that gets written out as a single file by default.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Limitations

The DariaWriters.writeSingleFile Scala approach and the df.toPandas() Python approach only work for small datasets. Huge datasets can not be written out as single files. Writing out data as a single file isn't optimal from a performance perspective because the data can't be written in parallel.

Powers
  • 15,489
  • 8
  • 77
  • 98
  • Hi, Is the `1.0.0` version of `spark-daria` published to maven repo? I don't see it available there. – Kishore Bandi Mar 04 '21 at 12:39
  • @BandiKishore - Yes, here is the link: https://repo1.maven.org/maven2/com/github/mrpowers/spark-daria_2.12/1.0.0/ – Powers Mar 04 '21 at 13:58
8

I'm using this in Python to get a single file:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Kees C. Bakker
  • 30,662
  • 25
  • 112
  • 197
  • 1
    This might work but it is not memory efficient method since the driver has to convert the Spark Dataframe to pandas. So it might be a good way if the data not too large. – Sharhabeel Hamdan Nov 18 '20 at 01:38
  • 2
    With smaller data it works like a charm :-D and your files are not in a weird format :D – Kees C. Bakker Nov 23 '20 at 15:37
5

A solution that works for S3 modified from Minkymorgan.

Simply pass the temporary partitioned directory path (with different name than final path) as the srcPath and single final csv/txt as destPath Specify also deleteSource if you want to remove the original directory.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
John Zhu
  • 151
  • 2
  • 8
  • copyMerge implementation lists all the files and iterates over them, this is not safe in s3. if you write your files and then list them - this doesn't guarantee that all of them will be listed. see [this|https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel] – LiranBo Feb 24 '20 at 10:07
  • @LiranBo, sorry why exactly does this not guarantee it will work. To quote the linked doc "A process writes a new object to Amazon S3 and immediately lists keys within its bucket. The new object will appear in the list." – theannouncer Jul 16 '21 at 01:42
  • it is now, before Dec 1 2020, s3 didn't guarantee list after write consistency. it does now - [link](https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/#:~:text=S3%20is%20Now%20Strongly%20Consistent&text=Effective%20immediately%2C%20all%20S3%20GET,metadata%2C%20are%20now%20strongly%20consistent.) – LiranBo Jul 18 '21 at 11:45
3
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

I solved using below approach (hdfs rename file name):-

Step 1:- (Crate Data Frame and write to HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Step 2:- (Create Hadoop Config)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Step3 :- (Get path in hdfs folder path)

val pathFiles = new Path("/hdfsfolder/blah/")

Step4:- (Get spark file names from hdfs folder)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5:- (create scala mutable list to save all the file names and add it to the list)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Step 6:- (filter _SUCESS file order from file names scala list)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

step 7:- (convert scala list to string and add desired file name to hdfs folder string and then apply rename)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
2

repartition/coalesce to 1 partition before you save (you'd still get a folder but it would have one part file in it)

Arnon Rotem-Gal-Oz
  • 24,800
  • 3
  • 45
  • 68
2

you can use rdd.coalesce(1, true).saveAsTextFile(path)

it will store data as singile file in path/part-00000

mrsrinivas
  • 31,318
  • 13
  • 117
  • 123
Gourav
  • 1,195
  • 2
  • 9
  • 12
1
spark.sql("select * from df").coalesce(1).write.option("mode","append").option("header","true").csv("/your/hdfs/path/")

spark.sql("select * from df") --> this is dataframe

coalesce(1) or repartition(1) --> this will make your output file to 1 part file only

write --> writing data

option("mode","append") --> appending data to existing directory

option("header","true") --> enabling header

csv("<hdfs dir>") --> write as CSV file & its output location in HDFS

OneCricketeer
  • 151,199
  • 17
  • 111
  • 216
Venkat
  • 319
  • 4
  • 9
  • You can also use `df.select("*")`, but if using HDFS, almost all Hadoop tools accept a directory of files, so it is better to let Spark split the file for future parallel file-reads – OneCricketeer Aug 10 '21 at 19:02
0

by using Listbuffer we can save data into single file:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()
0
def export_csv(  
  fileName: String,
  filePath: String
  ) = {

  val filePathDestTemp = filePath + ".dir/"
  val merstageout_df = spark.sql(merstageout)

  merstageout_df
    .coalesce(1)
    .write
    .option("header", "true")
    .mode("overwrite")
    .csv(filePathDestTemp)
  
  val listFiles = dbutils.fs.ls(filePathDestTemp)

  for(subFiles <- listFiles){
      val subFiles_name: String = subFiles.name
      if (subFiles_name.slice(subFiles_name.length() - 4,subFiles_name.length()) == ".csv") {
        dbutils.fs.cp (filePathDestTemp + subFiles_name,  filePath + fileName+ ".csv")
        dbutils.fs.rm(filePathDestTemp, recurse=true)
      }}} 
Luiz Viola
  • 1,119
  • 1
  • 4
  • 19
-2

There is one more way to use Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}