Git Product home page Git Product logo

cde_1.19_newfeatures's Introduction

CDE 1.19 New Features

Objective

CDE 1.19 has been introduced in May 2023 with the following enhancements and new features:

  • Two Tiers of Virtual Clusters: Core and All-Purpose. Core clusters are dedicated to batch jobs. All-Purpose clusters include Interactive Sessions.
  • Interactive sessions: introducing Spark Shell capabilities directly within the CDE Virtual Cluster.
  • Spark 3.3 Support. Now CDE supports 3 versions of Spark -- 2.4, 3.2, and 3.3. 3.2 will also serve as the LTS so customer can depend on it like they do on-premise.
  • Support for Hong Kong & Jakarta workload regions.
  • Airflow performance is now 2x faster from a combination of upgrade & continued optimizations.
  • Support for File Resources with Airflow CDE Jobs: you can now mount extra files to Airflow jobs. This will be extended in the next release to support python packages.
  • CDE CLI profiles: ability to set up multiple virtual clusters and keys and easily switch between them using the CLI.
  • Spark-submit migration tooling: use the tool to migrate Datahub Spark jobs to CDE.

To learn more please visit the release notes at this URL

This repository showcases the following 1.19 capabilities:

Requirements

You can reproduce these use cases in your CDE Virtual Cluster:

  • CDE Service Version 1.19 in Private or Public Cloud (AWS, Azure, OCP and Cloudera ECS ok).
  • A CDE Virtual Cluster of type "All-Purpose".
  • Basic familiarity with Python, PySpark, Airflow and Docker.

Project Setup

Clone this git repository to a local folder on your machine. All files and dependencies are included in this project.

Interactive Sessions

Using Interactive Sessions in the CDE UI

From the CDE Landing Page open "Sessions" on the left pane and then select the CDE Virtual Cluster where you want to run your CDE Interactive Session.

alt text

alt text

The session will be in "starting" state for a few moments. When it's ready, launch it and open the Spark Shell by clicking on the "Interact" tab.

Copy and paste the following code snippets in each cell and observe the output (no code changes required).

Note
CDE Sessions do not require creating the SparkSession object. The shell has already been launched for you. However, if you need to import any types or functions you do have to import the necessary modules.

Import PySpark:

from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

Create a list of Rows. Infer schema from the first row, create a DataFrame and print the schema:

rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
some_df = spark.createDataFrame(rows)
some_df.printSchema()

Create a list of tuples:

tuples = [("John", 19), ("Smith", 23), ("Sarah", 18)]

Create a Spark schema with two fields - person_name and person_age

schema = StructType([StructField("person_name", StringType(), False),
                    StructField("person_age", IntegerType(), False)])

Create a DataFrame by applying the schema to the RDD and print the schema

another_df = spark.createDataFrame(tuples, schema)
another_df.printSchema()

Iterate through the Spark Dataframe:

for each in another_df.collect():
    print(each[0])

alt text

Using Interactive Sessions with the CDE CLI

You can interact with the same CDE Session from your local terminal using the cde sessions interact command.

Open your terminal and enter cde session interact --name InteractiveSession. You will be prompted for your password and then the SparkShell will launch.

Run the same PySpark code into the shell.

alt text

alt text

Navigate back to the CDE Session and validate that the code has run from the UI.

alt text

You can also create a session directly from the CLI. In your local terminal, exit out of your current Spark Shell with "ctrl+D" and then run the following command:

cde session create --name cde_shell_from_cli --type spark-scala --description launched-from-cli --executor-cores 4 --num-executors 2.

Notice that you can pass CDE Compute Options such as number of executors and executor-cores when using the command.

alt text

alt text

Using CDE Airflow Jobs with File Resources

Until CDE 1.18 File resources allowed you to mount files with CDE Spark Jobs only. CDE 1.19 introduces the ability to leverage File Resources with CDEc Airflow jobs. In practice, this means that you can use an Airflow Operator to parse values from files loaded on CDE File Resources and execute conditional logic in your DAG with them.

Open the airflow_dag.py file located in the cde_jobs folder. Familiarize yourself with the code. Notice the following:

  • Lines 84-88: an instance of BashOperator is used to read the file from the CDE File Resource. Notice that we save the value as an xcom variable at line 87. Xcoms allow you to temporarily store values so they are accessible by other operators. Also notice that my_airflow_file_resource is the name of the CDE Resource while my_file.conf is the name of the File we will upload in it.
read_conf = BashOperator(
      task_id="read-resource-file-task",
      bash_command="cat /app/mount/my_airflow_file_resource/my_file.conf",
        do_xcom_push=True
  )
  • Line 90-96: an instance of the PythonOperator is used to parse the value read from the Airflow Job Dependency file via Xcom. In general you can use a similar approach to transform, process and refine values read from files.
def _print_confs(**context):
    return context['ti'].xcom_pull(task_ids='read-resource-file-task')

pythonstep = PythonOperator(
    task_id="print_file_resource_confs",
    python_callable=_print_confs,
)
  • Lines 98-108: Airflow Task Branching is used to make a decision based on the value read from the File Resource. In this example we use two instance of EmptyOperator (lines 110 and 11) but you can more generally use this approach to implement complex logic in your DAG.
@task.branch(task_id="branch_task")
def branch_func(**context):
    airflow_file_resource_value = int(context['ti'].xcom_pull(task_ids="read-resource-file-task"))
    if airflow_file_resource_value >= 5:
        return "continue_task"
    elif airflow_file_resource_value >= 2 and airflow_file_resource_value < 5:
        return "stop_task"
    else:
        return None

branch_op = branch_func()

continue_op = EmptyOperator(task_id="continue_task")
stop_op = EmptyOperator(task_id="stop_task")

Steps to Reproduce the Example in your CDE Cluster

Using the CDE Jobs UI create a CDE Spark Job with name spark-sql with the pyspark_sql.py script without running it.

Open your local terminal and run the following CDE CLI commands.

⚠ Warning
A CDE 1.19 Virtual Cluster is required. If you already had the CLI installed you will have to reinstall it. The fastest way to check is by running cde --version from your terminal. For instructions on installing the CLI please visit the documentation

Create a File Resource for your Airflow Job:

cde resource create --name my_pipeline_resource

Upload the Airflow DAG script to the CDE File Resource:

cde resource upload --name my_pipeline_resource --local-path airflow_dag.py

Create a File Resource for your Airflow Dependency File:

cde resource create --name my_airflow_file_resource

Upload the Airflow Dependency File to the File Resource:

cde resource upload --name my_airflow_file_resource --local-path my_file.conf

Run the CDE Airflow Job:

cde job create --name my_pipeline --type airflow --dag-file airflow_dag.py --mount-1-resource my_pipeline_resource --airflow-file-mount-1-resource my_airflow_file_resource

Navigate back to the CDE Job Runs page and validate outputs in the Logs tabs.

alt text

Open the Airflow UI and validate job execution.

alt text

You can choose from a variety of Airflow monitoring options. For example the Graph tab provides a way to validate which branch was chosen by the Airflow DAG.

alt text

Spark Submit Migration Tool

The CDE CLI provides a similar although not identical way of running "spark-submits" in CDE. However, adapting many spark-submit command to CDE might become an obstacle. The CDE Engineering team created a Spark Migration tool to facilitate the conversion of a spark-submit to a cde spark-submit.

Step By Step Instructions

⚠ Warning
This tutorial utilizes Docker to streamline the installation process of the Spark Submit Migration tool. If you don't have Docker installed on your machine you will have to install it manually by following this tutorial by Vish Rajagopalan instead.

Navigate to the CDP Management Console and download your user credentials file. The credentials file includes a CDP Access Key ID and a CDP Private Key.

alt text

alt text

alt text

alt text

Next, navigate to the CDE Virtual Cluster Details and copy the JOBS_API_URL.

alt text

Launch the example Docker container.

docker run -it pauldefusco/cde_spark_submit_migration_tool:latest

You are now inside the running container. Next, activate the Spark Submit Migration tool by running the following shell command.

cde-env.sh activate -p vc-1

Navigate to the .cde folder and place the CDP Access Key ID and Private Key you downloaded earlier in the respective fields.

Next, open the config.yaml file located in the same folder. Replace the cdp console value at line 3 with the CDP Console URL (e.g. https://console.us-west-1.cdp.cloudera.com/). Then, enter your JOBS_API_URL in the "vcluster-endpoint" field at line 8.

Finally, run the following spark-submit. This is a sample submit taken from a legacy CDH cluster.

spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 2G \
--driver-memory 1G \
--driver-cores 1 \
--queue default \
06-pyspark-sql.py

Shortly you should get output in your terminal including a Job Run ID confirming successful job submission to CDE. In the screenshot example below the Job Run ID is 9.

alt text

Navigate to your CDE Virtual Cluster Job Runs page and validate the job is running or has run successfully.

alt text

⚠ Warning
If you are unable to run the spark-submit you may have to remove the tls setting from config.yaml. In other words, completely erase line 4 from your config file under the .cde folder.

Conclusions & Next Steps

CDE is the Cloudera Data Engineering Service, a containerized managed service for Spark and Airflow. Each CDE virtual cluster includes an embedded instance of Apache Airflow.

With Airflow based pipelines users can now specify their data pipeline using a simple python configuration file.

A basic CDE Airflow DAG can be composed of a mix of hive and spark operators that automatically run jobs on CDP Data Warehouse (CDW) and CDE, respectively; with the underlying security and governance provided by SDX.

However, thanks to the flexibility of Airflow, CDE can also empower users with the ability to integrate with other CDP Data Services and 3rd party systems. For example, you can combine the operators we have seen above to create complex pipeleines across multiple domains such as Datawarehousing, Machine Learning, and much more.

alt text

If you are exploring CDE you may find the following tutorials relevant:

  • Using CDE Airflow: an deep dive into CDE Airflow Capabilities along with answers to FAQ's and Cloudera-recommended best practices.

  • Spark 3 & Iceberg: a quick intro of Time Travel Capabilities with Spark 3

  • Simple Intro to the CDE CLI: A simple introduction to the CDE CLI for the

  • CDE CLI Demo: A more advanced CDE CLI reference with additional details for the CDE user who wants to move beyond the basics shown here.

  • GitLab2CDE: a CI/CD pipeline to orchestrate Cross Cluster Workflows - Hybrid/Multicloud Data Engineering

  • CML2CDE: a CI/CD Pipeline to deploy Spark ETL at Scale with Python and the CDE API

  • Postman2CDE: using the Postman API to bootstrap CDE Services

For more information on the Cloudera Data Platform and its form factors please visit this site.

For more information on migrating Spark jobs to CDE, please reference this guide.

If you have any questions about CML or would like to see a demo, please reach out to your Cloudera Account Team or send a message through this portal and we will be in contact with you soon.

cde_1.19_newfeatures's People

Contributors

pdefusco avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.