I am running PySpark code using spark-submit pyspark_script.py --master yarn --num-executors 6 --driver-memory 30g --executor-memory 7g
I have made sure to run it in yarn mode. Still facing out of memory issue.
My Code:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType
import os
conf = SparkConf().setAppName("PySpark App")
sc=SparkContext(conf=conf)
base_path=os.path.join(os.path.dirname(os.path.abspath(__file__)))
file_path=os.path.join(os.path.abspath("combine_ocssubsdump_20210526.csv"))
rdd1=sc.textFile("file:////mapr/samrat/test/samrat/combine_ocssubsdump_20210526.csv",5,use_unicode=False)
spark = SparkSession \
.builder \
.appName("recon") \
.master("yarn") \
.enableHiveSupport() \
.getOrCreate()
#schema=["SubscriberID","ParentID","Brand","SubBrand","ActivationDate","ActiveEndDate","CallBarringEndDate","SuspendEndDate","Status","AirtimeBalance","AirtimeBalanceExpiry","StatusDetail","ProductWallet","OfferDetails","fileProperties"]
schema = StructType([StructField("SubscriberID",StringType(),True),
StructField("ParentID",StringType(),True),
StructField("Brand",StringType(),True),
StructField("SubBrand",StringType(),True),
StructField("ActivationDate",StringType(),True),
StructField("ActiveEndDate",StringType(),True),
StructField("CallBarringEndDate",StringType(),True),
StructField("SuspendEndDate",StringType(),True),
StructField("Status",StringType(),True),
StructField("AirtimeBalance",StringType(),True),
StructField("AirtimeBalanceExpiry",StringType(),True),
StructField("StatusDetail",StringType(),True),
StructField("ProductWallet",StringType(),True),
rdd1=rdd1.map(lambda x:x.split("|")).cache()
rdd2=rdd1.map(lambda x:(x[4].split(" ")[0],1))
rdd3=rdd2.reduceByKey(lambda a,b:a+b)
df1=spark.createDataFrame(data=rdd1,schema=schema)
df2=df1.select("SubscriberID","ActivationDate")
df3=df1.groupBy("Brand").count()
df3.show()