How to submit a python - Spark2 action via oozie in a Cloudera Distributed Hadoop Cluster

Steps to schedule a oozie action to invoke Pyspark2

As on this Article, Spark2 action in oozie is still a known limitation in Cloudera Distributed Hadoop

Ref:

  Below listed are the steps to workaround that limitation.

To use Oozie Spark action with Spark 2 jobs, create a spark2 ShareLib directory, copy associated files into it, and then point Oozie to spark2. (The Oozie ShareLib is a set of libraries that allow jobs to run on any node in a cluster.)
1.    Create a spark2 ShareLib directory under the Oozie ShareLib directory associated with the oozie service user:
hdfs dfs -mkdir /user/oozie/share/lib/lib_<ts>/spark2
2.    Copy spark2 jar files from the spark2 jar directory to the Oozie spark2 ShareLib:
hdfs dfs -put \
    /opt/cloudera/parcels/SPARK2/lib/spark2/jars/* \
    /user/oozie/share/lib/lib_<ts>/spark2/
3.    Copy the oozie-sharelib-spark jar file from the spark ShareLib directory to the spark2 ShareLib directory:
hdfs dfs -cp \
    hdfs dfs -ls /user/oozie/share/lib/lib_<ts>/spark/oozie-sharelib*.jar \
    /user/oozie/share/lib/lib_<ts>/spark2/
4.    Copy Python libraries to the spark2 ShareLib: 
hadoop fs -put /opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/py* /user/oozie/share/lib/lib_<ts>/spark2/


5.    Run the Oozie sharelibupdate command to make oozie include the latest libraries we copied:
oozie admin -oozie http://<oozie_server_hostname>:11000/oozie/ -sharelibupdate
6.      To verify the configuration, run the Oozie shareliblist command. You should see spark2 in the results.
 oozie admin -oozie http://<oozie_server_hostname>:11000/oozie/ -shareliblist spark2
7.     To run a Spark job with the spark2 ShareLib, add the action.sharelib.for.spark property to the job.properties file, and set its value to spark2:
oozie.action.sharelib.for.spark=spark2

8.     The following examples show a workflow definition XML file, an Oozie job configuration file, and a Python script for running a SparkPythonTest job.
Sample Workflow.xml file for pyspark2_test.py:
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkPythonTest'>
          <start to='spark-node' />
         
          <action name='spark-node'>
            <spark xmlns="uri:oozie:spark-action:0.1">
              <job-tracker>${jobTracker}</job-tracker>
              <name-node>${nameNode}</name-node>
              <master>${master}</master>
              <name>Python-Spark-Test</name>
              <jar>pyspark2_test.py</jar>
            </spark>
            <ok to="end" />
            <error to="fail" />
          </action>
         
          <kill name="fail">
            <message>Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
          </kill>
          <end name='end' />
        </workflow-app>
Sample Job.properties file for 'SparkPythonTest':
oozie.use.system.libpath=True
send_email=False
dryrun=False
nameNode=hdfs://HANameservice
jobTracker=<YARN_Resource_Manager_Hostname>:8032
oozie.action.sharelib.for.spark=spark2

Sample Python script, lib/pyspark2_test.py:
from pyspark import HiveContext,SparkContext,SparkConf
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.session import SparkSession

from datetime import datetime
import pandas as pd

spark = SparkSession.builder.appName("SPARK2_TEST_APP").config("spark.sql.warehouse.dir", "/user/hive/warehouse").enableHiveSupport().getOrCreate()

df=spark.sql("select * from datalake.employee")
df.registerTempTable("temp.emp");
new_df=spark.sql("select * from temp.emp");

Comments

  1. If we consider the Big data platform managed service, then adaptive learning is an excellent way to make it successful.

    ReplyDelete

Post a Comment

Popular Posts