Steps to schedule a oozie action to invoke Pyspark2
As on this Article, Spark2 action in oozie is still a known
limitation in Cloudera Distributed Hadoop
Ref:
Below listed are the steps to workaround that limitation.
To use Oozie Spark action with Spark 2 jobs,
create a spark2 ShareLib directory, copy associated
files into it, and then point Oozie to spark2.
(The Oozie ShareLib is a set of libraries that allow jobs to run on any node in
a cluster.)
1.
Create a spark2 ShareLib directory under the Oozie ShareLib directory
associated with the oozie service user:
hdfs dfs -mkdir
/user/oozie/share/lib/lib_<ts>/spark2
2.
Copy spark2 jar files from the spark2 jar directory to the Oozie spark2 ShareLib:
hdfs dfs -put \
/opt/cloudera/parcels/SPARK2/lib/spark2/jars/* \
/user/oozie/share/lib/lib_<ts>/spark2/
3.
Copy the oozie-sharelib-spark jar file from the spark ShareLib directory to the spark2 ShareLib directory:
hdfs dfs -cp \
hdfs dfs -ls
/user/oozie/share/lib/lib_<ts>/spark/oozie-sharelib*.jar \
/user/oozie/share/lib/lib_<ts>/spark2/
4.
Copy Python libraries
to the spark2 ShareLib:
hadoop fs -put
/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/py* /user/oozie/share/lib/lib_<ts>/spark2/
5.
Run the Oozie sharelibupdate command to make oozie include the latest libraries we
copied:
oozie admin -oozie
http://<oozie_server_hostname>:11000/oozie/ -sharelibupdate
6. To verify the configuration, run the Oozie shareliblist command. You should see spark2 in the results.
oozie admin -oozie
http://<oozie_server_hostname>:11000/oozie/ -shareliblist spark2
7. To run a Spark job with the spark2 ShareLib,
add the action.sharelib.for.spark property to the job.properties file,
and set its value to spark2:
oozie.action.sharelib.for.spark=spark2
8. The following examples show a workflow
definition XML file, an Oozie job configuration file, and a Python script for
running a SparkPythonTest job.
Sample Workflow.xml file for pyspark2_test.py:
<workflow-app
xmlns='uri:oozie:workflow:0.5' name='SparkPythonTest'>
<start to='spark-node' />
<action name='spark-node'>
<spark
xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>${master}</master>
<name>Python-Spark-Test</name>
<jar>pyspark2_test.py</jar>
</spark>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Workflow failed,
error message [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
Sample Job.properties file for 'SparkPythonTest':
oozie.use.system.libpath=True
send_email=False
dryrun=False
nameNode=hdfs://HANameservice
jobTracker=<YARN_Resource_Manager_Hostname>:8032
oozie.action.sharelib.for.spark=spark2
Sample Python script, lib/pyspark2_test.py:
from
pyspark import HiveContext,SparkContext,SparkConf
import
pyspark.sql.functions as func
from
pyspark.sql.window import Window
from
pyspark.sql.session import SparkSession
from
datetime import datetime
import
pandas as pd
spark =
SparkSession.builder.appName("SPARK2_TEST_APP").config("spark.sql.warehouse.dir",
"/user/hive/warehouse").enableHiveSupport().getOrCreate()
df=spark.sql("select
* from datalake.employee")
df.registerTempTable("temp.emp");
new_df=spark.sql("select * from temp.emp");
If we consider the Big data platform managed service, then adaptive learning is an excellent way to make it successful.
ReplyDelete