I developed a Gradient Boosting classifier model, put it in a pickle file and used it to score a Hive table. However, when I am trying to run the scoring script in Python 3, it returns the below error:
22/01/30 17:26:34 194 ERROR TaskSetManager: Task 821 in stage 2.0 failed 4 times; aborting job 22/01/30 17:26:34 212 ERROR FileFormatWriter: Aborting job null. org.apache.spark.SparkException: Job aborted due to stage failure: Task 821 in stage 2.0 failed 4 times, most recent failure: Lost task 821.3 in stage 2.0........
......... [LONG ERROR]
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 821 in stage 2.0 failed 4 times, most recent failure: Lost task 821.3 in stage 2.0 ...... 1.cdh5.12.0.p0.142354/lib/spark2/python/lib/pyspark.zip/pyspark/broadcast.py", line 108, in value self._value = self.load(self._path) File "/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/python/lib/pyspark.zip/pyspark/broadcast.py", line 99, in load return pickle.load(f) ModuleNotFoundError: No module named 'sklearn'
I find this weird as I am able to successfully import sklearn at the start of the script. The package is installed. The error only appears when I am writing the table already containing the scores. Below is my code for creating the model and pickle file. This runs perfectly fine.
gbm = GradientBoostingClassifier(max_depth=6, max_features=4, n_estimators=300)
gbm.fit(X_train, y_train)
import pickle
pickle_out = open("my_model.pkl", "wb")
pickle.dump(gbm, pickle_out)
pickle_out.close()
Below is my scoring script which fails. This script uses the pickle file generated above. This script fails at the last line only, when it is already writing the scored table.
# create path to your username on hdfs
hdfs_path = os.path.join(os.sep, 'user', 'username')
# put into hdfs
put = Popen(["hadoop", "fs", "-put", "my_model.pkl", hdfs_path], stdin=subprocess.PIPE, bufsize=-1)
put.communicate()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
df = spark.sql("""SELECT * from db_name.table_name_for_scoring""")
df = df.na.fill(0)
model_rdd_pkl = sc.binaryFiles("my_model.pkl")
model_rdd_data = model_rdd_pkl.collect()
ms_model = pickle.loads(model_rdd_data[0][1])
broadcast_ms_model = sc.broadcast(ms_model)
def predict(*cols):
prediction = broadcast_ms_model.value.predict_proba((cols,))
return float(prediction[0,1])
predict_udf = udf(predict, DoubleType())
feature_list = ['feature_a', 'feature_b', 'feature_c']
df_scored = df.withColumn("score", predict_udf(*feature_list))
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sqlContext = SQLContext(spark)
df_scored.createOrReplaceTempView("mytempTable")
sqlContext.sql("create table db_name.scored_talbe STORED AS PARQUET as select * from mytempTable");
I already tried suggestions in this question, but the error still shows. I tried installing, downgrading, upgrading versions of my numpy, sklearn and scipy already but still the same error. What could I be missing?
Below are the current versions of my scikit-learn, numpy and scipy in the Python 3 engine where it fails:
- sklearn - 0.24.0
- scipy - 1.5.2
- numpy - 1.17.2
I was successful in running both the model and scoring script in Python 2, but when I try running in Python 3 already (which is what will be used in deployment, and which is what I need to work), the no module named sklearn error appears. I need to run this in Python 3.
Below are the versions of scikit-learn, numpy and scipy in the Python 2 engine which worked perfectly fine. I already tried downgrading the versions of these packages in the Python 3 engine but it still shows the same error:
- scikit-learn - 18.1
- scipy - 1.2.3
- numpy - 1.16.5
What else can I do? What could I be missing?