0

I want to do parallel processing in for loop using pyspark.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data = [a,b,c]


for i in data:
    try:
        df = spark.read.parquet('gs://'+i+'-data')
        df.createOrReplaceTempView("people")
        df2=spark.sql("""select * from people """)
        df.show()
    except Exception as e:
        print(e)
        continue

Above mentioned script is working fine but i want to do parallel processing in pyspark and which is possible in scala

Jeffrey Chung
  • 19,031
  • 8
  • 32
  • 54
Amol
  • 228
  • 5
  • 14
  • Does this answer your question? [How to run multiple Spark jobs in parallel?](https://stackoverflow.com/questions/49568940/how-to-run-multiple-spark-jobs-in-parallel) – ernest_k Jan 10 '20 at 05:07
  • 1
    Does this answer your question? [How to run independent transformations in parallel using PySpark?](https://stackoverflow.com/questions/38048068/how-to-run-independent-transformations-in-parallel-using-pyspark) – user10938362 Jan 10 '20 at 10:26

1 Answers1

3

Spark it-self runs job parallel but if you still want parallel execution in the code you can use simple python code for parallel processing to do it.

data = ["a","b","c"]

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)


def fun(x):
    try:
        df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
        df.show()
    except Exception as e:
        print(e)

pool.map( fun,data)

I have changed your code a bit but this is basically how you can run parallel tasks, If you have some flat files that you want to run parallel just make a list with their name and pass it in pool.map( fun,data).

Change the function fun as need be.

For mode details on the multiprocessing module check the documentation.

Similarly if you want to do it in Scala you will need the following modules

import scala.concurrent.{Future, Await}

For more detailed understanding check this out. The code is for Databricks but with a few changes it will work with your environment.

Andy_101
  • 1,116
  • 9
  • 19
  • Will this bring it to the driver node? Or will it execute the parallel processing in the multiple worker nodes? – thentangler Feb 03 '21 at 06:28
  • 2
    this is parallel execution in the code not actuall parallel execution. this is simple python parallel Processign it dose not interfear with the Spark Parallelism. – Andy_101 Feb 03 '21 at 12:10