43

I am trying to convert a .csv file to a .parquet file.
The csv file (Temp.csv) has the following format

1,Jon,Doe,Denver

I am using the following python code to convert it into parquet

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import os

if __name__ == "__main__":
    sc = SparkContext(appName="CSV2Parquet")
    sqlContext = SQLContext(sc)

    schema = StructType([
            StructField("col1", IntegerType(), True),
            StructField("col2", StringType(), True),
            StructField("col3", StringType(), True),
            StructField("col4", StringType(), True)])
    dirname = os.path.dirname(os.path.abspath(__file__))
    csvfilename = os.path.join(dirname,'Temp.csv')    
    rdd = sc.textFile(csvfilename).map(lambda line: line.split(","))
    df = sqlContext.createDataFrame(rdd, schema)
    parquetfilename = os.path.join(dirname,'output.parquet')    
    df.write.mode('overwrite').parquet(parquetfilename)

The result is only a folder named, output.parquet and not a parquet file that I'm looking for, followed by the following error on the console.

CSV to Parquet Error

I have also tried running the following code to face a similar issue.

from pyspark.sql import SparkSession
import os

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# read csv
dirname = os.path.dirname(os.path.abspath(__file__))
csvfilename = os.path.join(dirname,'Temp.csv')    
df = spark.read.csv(csvfilename)

# Displays the content of the DataFrame to stdout
df.show()
parquetfilename = os.path.join(dirname,'output.parquet')    
df.write.mode('overwrite').parquet(parquetfilename)

How to best do it? Using windows, python 2.7.

  • [Similar question?](https://stackoverflow.com/questions/42022890/how-can-i-write-a-parquet-file-using-spark-pyspark) – lwileczek May 30 '18 at 12:05
  • @lwileczek It's a different question as the linked question explicitly asks for Spark, this is just about using Python in general. – Uwe L. Korn May 30 '18 at 12:18

6 Answers6

57

Using the packages pyarrow and pandas you can convert CSVs to Parquet without using a JVM in the background:

import pandas as pd
df = pd.read_csv('example.csv')
df.to_parquet('output.parquet')

One limitation in which you will run is that pyarrow is only available for Python 3.5+ on Windows. Either use Linux/OSX to run the code as Python 2 or upgrade your windows setup to Python 3.6.

Uwe L. Korn
  • 6,680
  • 1
  • 27
  • 41
  • Thanks for your answer. Isn't there a way to do it using Python 2.7 on Windows? – inquisitiveProgrammer May 30 '18 at 14:36
  • 3
    This is a very simple way to convert a single file into a parquet file, but what if we have multiple csv files and we want to par it into a single parquet file? – Zombraz Nov 07 '18 at 00:08
  • 1
    @Zombraz you could loop through the files and convert each to parquet, if you are looking for anything outside of python, hive on AWS EMR works great in converting csv to parquet – George Appiah Sarfo Nov 25 '19 at 17:14
  • @Zombraz - you can use Dask or PySpark to convert multiple CSV files to a single Parquet file (or multiple Parquet files). See my answer for more details. – Powers Aug 23 '20 at 17:47
20

You can convert csv to parquet using pyarrow only - without pandas. It might be useful when you need to minimize your code dependencies (ex. with AWS Lambda).

import pyarrow.csv as pv
import pyarrow.parquet as pq

table = pv.read_csv(filename)
pq.write_table(table, filename.replace('csv', 'parquet'))

Refer to the pyarrow docs to fine-tune read_csv and write_table functions.

taras
  • 5,922
  • 10
  • 36
  • 48
14
import boto3
import pandas as pd
import pyarrow as pa
from s3fs import S3FileSystem
import pyarrow.parquet as pq

s3 = boto3.client('s3',region_name='us-east-2')
obj = s3.get_object(Bucket='ssiworkoutput', Key='file_Folder/File_Name.csv')
df = pd.read_csv(obj['Body'])

table = pa.Table.from_pandas(df)

output_file = "s3://ssiworkoutput/file/output.parquet"  # S3 Path need to mention
s3 = S3FileSystem()

pq.write_to_dataset(table=table,
                    root_path=output_file,partition_cols=['Year','Month'],
                    filesystem=s3)

print("File converted from CSV to parquet completed")
Amol More
  • 141
  • 1
  • 3
  • 1
    This is code for reading CSV file from AWS S3 path store it with Parquet format with partition in AWS S3 path. – Amol More May 30 '19 at 04:34
  • Make sure to run the below, pip3 install boto3 pip3 install pandas pip3 install pyarrow pip3 install fs-s3fs pip3 install s3fs – Amol More May 30 '19 at 04:35
  • 2
    How did you install pyarrow without having package's size problem on aws? – Haha Apr 14 '20 at 12:04
  • 1
    @Haha The easiest way is using [awswrangler](https://github.com/awslabs/aws-data-wrangler/releases) layer which already includes pyarrow – taras Nov 10 '20 at 19:30
11

There are a few different ways to convert a CSV file to Parquet with Python.

Uwe L. Korn's Pandas approach works perfectly well.

Use Dask if you'd like to convert multiple CSV files to multiple Parquet / a single Parquet file. This will convert multiple CSV files into two Parquet files:

import dask.dataframe as dd

df = dd.read_csv('./data/people/*.csv')
df = df.repartition(npartitions=4)
df.to_parquet('./tmp/people_parquet4')

You could also use df.repartition(npartitions=1) if you'd only like to output one Parquet file. More info on converting CSVs to Parquet with Dask here.

Here's a PySpark snippet that works in a Spark environment:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("local") \
  .appName("parquet_example") \
  .getOrCreate()

df = spark.read.csv('data/us_presidents.csv', header = True)
df.repartition(1).write.mode('overwrite').parquet('tmp/pyspark_us_presidents')

You can also use Koalas in a Spark environment:

import databricks.koalas as ks

df = ks.read_csv('data/us_presidents.csv')
df.to_parquet('tmp/koala_us_presidents')
Shoan
  • 3,928
  • 1
  • 25
  • 29
Powers
  • 15,489
  • 8
  • 77
  • 98
1

You can write as a PARQUET FILE using spark:

spark = SparkSession.builder.appName("Test_Parquet").master("local[*]").getOrCreate()

parquetDF = spark.read.csv("data.csv")

parquetDF.coalesce(1).write.mode("overwrite").parquet("Parquet")

I hope this helps

ishwar
  • 288
  • 3
  • 14
1
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys

sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)

schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), True),
    StructField("col3", StringType(), True),
    StructField("col4", StringType(), True),
    StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')