acroz / pylivy Goto Github PK
View Code? Open in Web Editor NEWA Python client for Apache Livy, enabling use of remote Apache Spark clusters.
License: MIT License
A Python client for Apache Livy, enabling use of remote Apache Spark clusters.
License: MIT License
Preface: Using livy with flask app, which is running via gunicorn, async worker, gevent worker.
All the services are dockerized, (flask/gunicorn, apache livy, spark).
Few times, I have observed that the library pylivy would throw Runtime error 'statement had no output', but, but, but, when I looked at the apache livy session output logs, the code would have executed with ok and would also have respective output in logs, tried to replicate this several times, was unlucky to replicate properly, but when I did, I found out that.
My two cents of thought, may be for few actions, output is delayed to the statement REST api, may we have to poll for a second or two to get response back.
Please help me fix this one! Thanks.
Hey, so i've been using pylivy quite a bit for a recent project and found that it greatly eases development if you can specify a session id for an already running session. This is especially handy if
the spark session you are connecting to takes quite a while to set up and load data.
To get an idea of what's been done, checkout my branch https://github.com/rondiplomatico/pylivy/tree/feature/resume i'd also create a PR from if you'd like to include this.
Cheers
We have found some edge cases when using read
to return a dataframe related to the transformation toJSON
and then json.loads
.
Specifically if all values of a column are null
then the column is dropped from the pandas dataframe. In addition we lose type information when coercing to json on all columns. For example, timestamps would be converted to pandas datatimes but are now returned as strings.
Is there an alternative approach we could explore for serializing the dataframe, such as using apache arrow as the toPandas
function does in pyspark?
Hi,
I am having a hard time getting livy running on my local machine. I have Windows 10, Python 3.6, and pyspark 2.4.1, and livy 0.5.0. I have a csv file that I want to read in with pyspark. Can someone show me simple steps of using livy to do this? I'm trying to see if livy is a good option to be able to run spark code from web applications.
Current behavior means that if the spark DF has duplicated column names then only one instance of that column gets recovered in the returned Pandas DF when we call the read/download method
Hi All,
I am trying to programmatically use livy and access remote spark context.
I have used below command to install livy as mentioned in docs:
pip install -U livy --user
But when I try to import LivySession I see below error:
>>> from livy import LivySession Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/...../Library/Python/2.7/lib/python/site-packages/livy/__init__.py", line 1, in <module> from livy.session import LivySession # noqa: F401 File "/Users/....../Library/Python/2.7/lib/python/site-packages/livy/session.py", line 22 dataframe_name: str, session_kind: SessionKind ^ SyntaxError: invalid syntax
Is it anything I am doing wrong ??
Right now the only way to return an empty dataset or null result is to construct an empty Spark frame. This is kind of clunky to do.
Might it make sense to change session.read to work on a variable set to None
and interpret it as an empty dataframe?
Thanks for creating this library, it is super useful!
For my use-case pandas is unnecessary, as I end up needing to convert the pandas dataframe immediately back into a Dict. I've got a fork over here where I've made pandas optional for the read_sql
method: master...gthomas-slack:make-pandas-optional
Wondering if this is something you would accept to be merged into this library, and if so, what tests would you want added?
Thanks!
Only 'kind' is allowed as parameter, what about all other request body inputs supported by livy rest api.
Am I missing something here?
proxyUser | User to impersonate when starting the session | string
jars | jars to be used in this session | List of string
pyFiles | Python files to be used in this session | List of string
files | files to be used in this session | List of string
driverMemory | Amount of memory to use for the driver process | string
driverCores | Number of cores to use for the driver process | int
executorMemory | Amount of memory to use per executor process | string
executorCores | Number of cores to use for each executor | int
numExecutors | Number of executors to launch for this session | int
archives | Archives to be used in this session | List of string
queue | The name of the YARN queue to which submitted | string
name | The name of this session | string
conf | Spark configuration properties | Map of key=val
heartbeatTimeoutInSecond | Timeout in second to which session be orphaned | int
Hi,
I have a requirement to use two headers when calling my livy endpoint:
"Authorization" for use with a token
and
"X-Requested-By" which is required because csrf is enabled in livy.
I have modified my local code pylivy code to enable headers (only some small changes)
Should I create a pull request to merge this with the code or is it a design choice to not include headers?
Currently, Livy.read()
only works with dataframes.
from livy import LivySession
LIVY_URL =**
with LivySession.create(LIVY_URL) as session:
# Run some code on the remote cluster
session.run("filtered = df.filter(df.name == 'Bob')")
When I try to run this code, I got this error. LivySession.create result in the following error:
ValueError: invalid version string '
Does anyone know the reason ?
Ideally add tests for basic auth (possibly with Knox) and Kerberos.
Hi @acroz! - sparkmagic has this very cool feature which allows you to transfer python objects like a pandas dataframe from the local python context into the remote spark context via Livy (eg. the pandas dataframe becomes a pyspark dataframe in the spark context).
I was thinking about looking into trying to add some similar functionality for pylivy
, but before I start looking into it I wanted to see if you have already looked into this at all, or have any thoughts on it?
Thanks!
I am going to start working on a PR that would allow users to set the job description using the spark context during LivySession.read()
. This is done using sc.setJobDescription("my description here")
which will replace the name in the Spark UI and history server for the collect stage of the session.
I would welcome input if this should be in a new method, a parameter for read
that allows injected code or just a name injection. We are already setting the job description in LivySession.run
so an optional attribute of the session that prepends the code to the set job description might make sense.
Only PySpark is currently tested.
is there a way I can download 100K rows using download_sql?
At the moment, we try to read the entire DataFrame in a single request. This task is to implement chunked read.
Hi,
I was trying to connect to an API endpoint with an X-AUTH-TOKEN. I received a requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url: https://wellness.qubole.com/livy-spark-<cluster_id>/version
. I configured my LIVY_URL and AUTH_TOKEN as the doc suggested and did this:
with LivySession.create(LIVY_URL, requests_session=requests_session) as session:
session.run("1+1")
A configuration works for local Jupyter notebooks with another package called sparkmagic. I was wondering if you had any thoughts on what could be different causing the above exception to occur. The jupyter notebook/livy integration is documented here: https://github.com/jupyter-incubator/sparkmagic.
Thanks for your time!
Hi
looks livy 0.6.0 broke the session api
any will to make that compatible ?
Hi!
Im trying to start livy sessions with hive:
from livy import LivySession
import datetime
LIVY_URL = "http://mylivy:80"
with LivySession.create(
LIVY_URL,
jars=[
"gs://mybacket/hotfix/jars/iceberg-spark3-runtime-0.9.0.jar",
"gs://mybacket/hotfix/jars/spark_etl-1.0-SNAPSHOT.jar",
"gs://mybacket/hotfix/jars/spark-bigquery-with-dependencies_2.12-0.17.3.jar"
],
py_files=["gs://mybacket/hotfix/dags/package.zip"],
num_executors=1,
name=f"add-attribution-window-hours-{datetime.datetime.now()}",
spark_conf={
"spark.kubernetes.container.image.pullPolicy": "Always",
"spark.kubernetes.driverEnv.ETL_ENV": "prod",
"spark.executorEnv.ETL_ENV": "prod",
"spark.kubernetes.driverEnv.HIVE_CONF_DIR": "/opt/spark/conf/hive-site",
"spark.sql.warehouse.dir": "gs://mybacket/hive/",
"spark.sql.catalogImplementation": "hive",
"spark.kubernetes.driver.secrets.hive-site": "/opt/spark/conf/hive-site",
"spark.executor.memory": "16g",
"spark.executor.cores": "6",
"spark.eventLog.enabled": "true",
"spark.kubernetes.namespace": "default"
}
) as session:
# Run some code on the remote cluster
session.run("spark.sql('show databases;').show(20, False)")
# Retrieve the result
#local_df = session.download("df")
#local_df.show()
but 'show databases' in hive always empty (like local empty hive-metastore)
in log i see its trying to start hive:
21/12/01 12:22:24 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('gs://mybucket/hive/').
21/12/01 12:22:24 INFO SharedState: Warehouse path is 'gs://mybucket/hive/'.
21/12/01 12:22:26 INFO CodeGenerator: Code generated in 267.63079 ms
21/12/01 12:22:26 INFO CodeGenerator: Code generated in 11.710265 ms
21/12/01 12:22:26 INFO CodeGenerator: Code generated in 17.16884 ms
when i start livy batch with same spark_conf, always working fine, i have access to all tables, and log looks like that:
21/12/01 09:29:03 INFO HiveConf: Found configuration file file:/opt/spark/conf/hive-site/hive-site.xml
21/12/01 09:29:03 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3.7 using Spark classes.
21/12/01 09:29:03 INFO HiveConf: Found configuration file file:/opt/spark/conf/hive-site/hive-site.xml
21/12/01 09:29:03 INFO SessionState: Created HDFS directory: /tmp/hive/root
21/12/01 09:29:03 INFO SessionState: Created local directory: /tmp/root
21/12/01 09:29:03 INFO SessionState: Created HDFS directory: /tmp/hive/root/c42cf693-a56b-44d4-8b4f-5b67ed85c721
21/12/01 09:29:03 INFO SessionState: Created local directory: /tmp/root/c42cf693-a56b-44d4-8b4f-5b67ed85c721
21/12/01 09:29:03 INFO SessionState: Created HDFS directory: /tmp/hive/root/c42cf693-a56b-44d4-8b4f-5b67ed85c721/_tmp_space.db
21/12/01 09:29:03 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.7) is gs://mybucket/hive/
21/12/01 09:29:04 INFO metastore: Trying to connect to metastore with URI thrift://myhive-metastore.us-north1-a.c.myproject.internal:9083
21/12/01 09:29:04 INFO metastore: Opened a connection to metastore, current connections: 1
21/12/01 09:29:04 INFO metastore: Connected to metastore.
how to correctly set hive-config for livy sessions?
I create session and download resulting dataframe. The running code itself is not important:
with LivySession.create(self.LIVY_URL, kind=SessionKind.PYSPARK, requests_session=self.requests_session, spark_conf=conf) as session:
session.run(code)
return session.download(download_dataframe_name)
It works fine in staging environment on small amounts of data. Fail with error on large amounts in production environment:
requests.exceptions.HTTPError: 500 Server Error: Server Error for url: https://***:443/gateway/production/livy/sessions/655/statements/1
{"msg":"Session '655' not found."}
Wherein YARN application finished with succeed status:
Livy logs looks like:
23/11/15 14:06:14 INFO InteractiveSession: Interactive session 656 created [appid: application_1698181251761_0043, owner: knox, proxyUser: Some(e.makrushin), state: idle, kind: pyspark, info: {driverLogUrl=http://***:8042/node/containerlogs/container_e58_1698181251761_0043_01_000001/e.makrushin, sparkUiUrl=http://***/proxy/application_1698181251761_0043/}]
23/11/15 14:09:54 INFO InteractiveSessionManager: Deleting session 656
23/11/15 14:09:54 INFO InteractiveSession: Stopping InteractiveSession 656...
23/11/15 14:09:54 WARN Rpc: [Rpc] Closing RPC channel with 2 outstanding RPCs.
23/11/15 14:09:54 ERROR SessionServlet$: internal error
java.util.concurrent.CancellationException
at io.netty.util.concurrent.DefaultPromise.cancel(...)(Unknown Source)
23/11/15 14:09:54 INFO InteractiveSession: Stopped InteractiveSession 656.
23/11/15 14:09:54 INFO InteractiveSessionManager: Deleted session 656
It seems the session is deleted before I can download the result. Why might this happen and how to fix it?
I also tried to handle downloaded dataframe in with
scope. And do not use with
at all. It doesn't change anything: i got same error at the moment of download
call
Hello, I'm trying to run some code for some personal project, but i got an error as said in the title. Here's the exact output :
Exception in thread django-main-thread:
Traceback (most recent call last):
File "/root/anaconda3/envs/tensorflow/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/root/anaconda3/envs/tensorflow/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/utils/autoreload.py", line 54, in wrapper
fn(*args, **kwargs)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/core/management/commands/runserver.py", line 117, in inner_run
self.check(display_num_errors=True)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/core/management/base.py", line 390, in check
include_deployment_checks=include_deployment_checks,
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/core/management/base.py", line 377, in _run_checks
return checks.run_checks(**kwargs)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/core/checks/registry.py", line 72, in run_checks
new_errors = check(app_configs=app_configs)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/core/checks/urls.py", line 40, in check_url_namespaces_unique
all_namespaces = _load_all_namespaces(resolver)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/core/checks/urls.py", line 57, in _load_all_namespaces
url_patterns = getattr(resolver, 'url_patterns', [])
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/utils/functional.py", line 80, in get
res = instance.dict[self.name] = self.func(instance)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/urls/resolvers.py", line 579, in url_patterns
patterns = getattr(self.urlconf_module, "urlpatterns", self.urlconf_module)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/utils/functional.py", line 80, in get
res = instance.dict[self.name] = self.func(instance)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/urls/resolvers.py", line 572, in urlconf_module
return import_module(self.urlconf_name)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/importlib/init.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "", line 994, in _gcd_import
File "", line 971, in _find_and_load
File "", line 955, in _find_and_load_unlocked
File "", line 665, in _load_unlocked
File "", line 678, in exec_module
File "", line 219, in _call_with_frames_removed
File "/root/Bastien/dev_frontend/site_test/site_test/urls.py", line 20, in
path('button/', include('button.urls')),
File "/root/anaconda3/envs/tensorflow/lib/python3.6/site-packages/django/urls/conf.py", line 34, in include
urlconf_module = import_module(urlconf_module)
File "/root/anaconda3/envs/tensorflow/lib/python3.6/importlib/init.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "", line 994, in _gcd_import
File "", line 971, in _find_and_load
File "", line 955, in _find_and_load_unlocked
File "", line 665, in _load_unlocked
File "", line 678, in exec_module
File "", line 219, in _call_with_frames_removed
File "/root/Bastien/dev_frontend/site_test/button/urls.py", line 3, in
from . import views
File "/root/Bastien/dev_frontend/site_test/button/views.py", line 3, in
import wordcount, livy
File "/root/Bastien/dev_frontend/site_test/livy.py", line 3, in
from livy.models import Batches
ModuleNotFoundError: No module named 'livy.models'; 'livy' is not a package
I am currently using livy 0.6, I thought it was coming from this but since there is nothing about it anywhere I c an't be sure 100%. I am working with Python 3.6. (In this code i'm using Django to call livy)
Here is my code :
Import json, pprint, requests, textwrap, logging
from typing import List
from livy.models import Batches
def send(inputText):
host = 'http://localhost:8998'
data = {"file":"/myapp/wordcount.py", "args":[inputText,"2"]}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/batches', data=json.dumps(data), headers=headers)
#session_url = host + r.headers['location']
#getID = requests.get(host + '/batches' + 'id', data=json.dumps(data), headers=headers)
#return getID
def sendAndGet(self) -> List[Batches]:
"""List all the active sessions in Livy."""
data = self._client.get("/batches")
return [Batches.from_json(item) for item in data["batches"]]
Since it's only a test, the last def. part is probably wrong but for now, it's only the livy.models import that I need.
I hope someone will be able to help me, thanks for your consideration.
By the way, the goal of this code is to get the ID of the livy batches i'm using for another purpose, if anyone has a better solution I am willing to listen.
Thank you very much.
Hi @acroz,
What is the use-case overlap between pylivy
and the built-in python client in apache-incubator/livy?
When custom requests.Session is passed to LivySession.create(), it does not honour 'Verify'.
Code sample:
r_session = requests.Session()
r_session.verify=False
livy_session = LivySession.create(
LIVY_URL,
requests_session=r_session,
kind=SessionKind.SQL)
This ignores verify=False set in requests session and will go ahead with default(Verify=true) behaviour.
We can either update it from arg requests_Session from here :
Line 56 in 6c7bf18
or pass appropriate value here :
Line 90 in 6c7bf18
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.