0

I am running PySpark code using spark-submit pyspark_script.py --master yarn --num-executors 6 --driver-memory 30g --executor-memory 7g

I have made sure to run it in yarn mode. Still facing out of memory issue.

My Code:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType
import os
conf = SparkConf().setAppName("PySpark App")
sc=SparkContext(conf=conf)
base_path=os.path.join(os.path.dirname(os.path.abspath(__file__)))
file_path=os.path.join(os.path.abspath("combine_ocssubsdump_20210526.csv"))
rdd1=sc.textFile("file:////mapr/samrat/test/samrat/combine_ocssubsdump_20210526.csv",5,use_unicode=False)
spark = SparkSession \
    .builder \
    .appName("recon") \
    .master("yarn") \
    .enableHiveSupport() \
    .getOrCreate()

#schema=["SubscriberID","ParentID","Brand","SubBrand","ActivationDate","ActiveEndDate","CallBarringEndDate","SuspendEndDate","Status","AirtimeBalance","AirtimeBalanceExpiry","StatusDetail","ProductWallet","OfferDetails","fileProperties"]
schema = StructType([StructField("SubscriberID",StringType(),True),
 StructField("ParentID",StringType(),True),
 StructField("Brand",StringType(),True),
 StructField("SubBrand",StringType(),True),
 StructField("ActivationDate",StringType(),True),
 StructField("ActiveEndDate",StringType(),True),
 StructField("CallBarringEndDate",StringType(),True),
 StructField("SuspendEndDate",StringType(),True),
 StructField("Status",StringType(),True),
 StructField("AirtimeBalance",StringType(),True),
 StructField("AirtimeBalanceExpiry",StringType(),True),
 StructField("StatusDetail",StringType(),True),
 StructField("ProductWallet",StringType(),True),

rdd1=rdd1.map(lambda x:x.split("|")).cache()
rdd2=rdd1.map(lambda x:(x[4].split(" ")[0],1))
rdd3=rdd2.reduceByKey(lambda a,b:a+b)
df1=spark.createDataFrame(data=rdd1,schema=schema)
df2=df1.select("SubscriberID","ActivationDate")
df3=df1.groupBy("Brand").count()
df3.show()
                                                 
Samrat Das
  • 57
  • 6
  • 1
    Directly use spark.read.csv(csvFilePath). This will remove the memory overhead of creating and processing rdds. [StackOverflow link](https://stackoverflow.com/questions/28782940/load-csv-file-with-spark) – Drashti Dobariya Jul 06 '21 at 05:17

0 Answers0