A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/cloudera/livy below:

cloudera/livy: Livy is an open source REST interface for interacting with Apache Spark from anywhere

Livy is an open source REST interface for interacting with Apache Spark from anywhere. It supports executing snippets of code or programs in a Spark context that runs locally or in Apache Hadoop YARN.

Pull requests are welcomed! But before you begin, please check out the Wiki.

To build Livy, you will need:

Debian/Ubuntu:
Redhat/CentOS:
MacOS:
Required python packages for building Livy:

To run Livy, you will also need a Spark installation. You can get Spark releases at https://spark.apache.org/downloads.html.

Livy requires at least Spark 1.6 and supports both Scala 2.10 and 2.11 builds of Spark, Livy will automatically pick repl dependencies through detecting the Scala version of Spark.

Livy also supports Spark 2.0+ for both interactive and batch submission, you could seamlessly switch to different versions of Spark through SPARK_HOME configuration, without needing to rebuild Livy.

Livy is built using Apache Maven. To check out and build Livy, run:

git clone https://github.com/cloudera/livy.git
cd livy
mvn package

By default Livy is built against Apache Spark 1.6.2, but the version of Spark used when running Livy does not need to match the version used to build Livy. Livy internally uses reflection to mitigate the gaps between different Spark versions, also Livy package itself does not contain a Spark distribution, so it will work with any supported version of Spark (Spark 1.6+) without needing to rebuild against specific version of Spark.

In order to run Livy with local sessions, first export these variables:

export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf

Then start the server with:

Livy uses the Spark configuration under SPARK_HOME by default. You can override the Spark configuration by setting the SPARK_CONF_DIR environment variable before starting Livy.

It is strongly recommended to configure Spark to submit applications in YARN cluster mode. That makes sure that user sessions have their resources properly accounted for in the YARN cluster, and that the host running the Livy server doesn't become overloaded when multiple user sessions are running.

Livy uses a few configuration files under configuration the directory, which by default is the conf directory under the Livy installation. An alternative configuration directory can be provided by setting the LIVY_CONF_DIR environment variable when starting Livy.

The configuration files used by Livy are:

A few things changed between since Livy 0.1 that require manual intervention when upgrading.

Using the Programmatic API

Livy provides a programmatic Java/Scala and Python API that allows applications to run code inside Spark without having to maintain a local Spark context. Here shows how to use the Java API.

Add the Cloudera repository to your application's POM:

<repositories>
  <repository>
    <id>cloudera.repo</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    <name>Cloudera Repositories</name>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
</repositories>

And add the Livy client dependency:

<dependency>
  <groupId>com.cloudera.livy</groupId>
  <artifactId>livy-client-http</artifactId>
  <version>0.2.0</version>
</dependency>

To be able to compile code that uses Spark APIs, also add the correspondent Spark dependencies.

To run Spark jobs within your applications, extend com.cloudera.livy.Job and implement the functionality you need. Here's an example job that calculates an approximate value for Pi:

import java.util.*;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;

import com.cloudera.livy.*;

public class PiJob implements Job<Double>, Function<Integer, Integer>,
  Function2<Integer, Integer, Integer> {

  private final int samples;

  public PiJob(int samples) {
    this.samples = samples;
  }

  @Override
  public Double call(JobContext ctx) throws Exception {
    List<Integer> sampleList = new ArrayList<Integer>();
    for (int i = 0; i < samples; i++) {
      sampleList.add(i + 1);
    }

    return 4.0d * ctx.sc().parallelize(sampleList).map(this).reduce(this) / samples;
  }

  @Override
  public Integer call(Integer v1) {
    double x = Math.random();
    double y = Math.random();
    return (x*x + y*y < 1) ? 1 : 0;
  }

  @Override
  public Integer call(Integer v1, Integer v2) {
    return v1 + v2;
  }

}

To submit this code using Livy, create a LivyClient instance and upload your application code to the Spark context. Here's an example of code that submits the above job and prints the computed value:

LivyClient client = new LivyClientBuilder()
  .setURI(new URI(livyUrl))
  .build();

try {
  System.err.printf("Uploading %s to the Spark context...\n", piJar);
  client.uploadJar(new File(piJar)).get();

  System.err.printf("Running PiJob with %d samples...\n", samples);
  double pi = client.submit(new PiJob(samples)).get();

  System.out.println("Pi is roughly: " + pi);
} finally {
  client.stop(true);
}

To learn about all the functionality available to applications, read the javadoc documentation for the classes under the api module.

Here's a step-by-step example of interacting with Livy in Python with the Requests library. By default Livy runs on port 8998 (which can be changed with the livy.server.port config option). We’ll start off with a Spark session that takes Scala code:

sudo pip install requests
import json, pprint, requests, textwrap
host = 'http://localhost:8998'
data = {'kind': 'spark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()

{u'state': u'starting', u'id': 0, u'kind': u'spark'}

Once the session has completed starting up, it transitions to the idle state:

session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
r.json()

{u'state': u'idle', u'id': 0, u'kind': u'spark'}

Now we can execute Scala by passing in a simple JSON command:

statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()

{u'output': None, u'state': u'running', u'id': 0}

If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:

statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())

{u'id': 0,
  u'output': {u'data': {u'text/plain': u'res0: Int = 2'},
              u'execution_count': 0,
              u'status': u'ok'},
  u'state': u'available'}

That was a pretty simple example. More interesting is using Spark to estimate Pi. This is from the Spark Examples:

data = {
  'code': textwrap.dedent("""
    val NUM_SAMPLES = 100000;
    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
      val x = Math.random();
      val y = Math.random();
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _);
    println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
    """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())

{u'id': 1,
 u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: Int = 100000\ncount: Int = 78501'},
             u'execution_count': 1,
             u'status': u'ok'},
 u'state': u'available'}

Finally, close the session:

session_url = 'http://localhost:8998/sessions/0'
requests.delete(session_url, headers=headers)

<Response [204]>

PySpark has the same API, just with a different initial request:

data = {'kind': 'pyspark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()

{u'id': 1, u'state': u'idle'}

The Pi example from before then can be run as:

data = {
  'code': textwrap.dedent("""
    import random
    NUM_SAMPLES = 100000
    def sample(p):
      x, y = random.random(), random.random()
      return 1 if x*x + y*y < 1 else 0

    count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
    print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
    """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

{u'id': 12,
u'output': {u'data': {u'text/plain': u'Pi is roughly 3.136000'},
            u'execution_count': 12,
            u'status': u'ok'},
u'state': u'running'}

SparkR has the same API:

data = {'kind': 'sparkr'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()

{u'id': 1, u'state': u'idle'}

The Pi example from before then can be run as:

data = {
  'code': textwrap.dedent("""
    n <- 100000
    piFunc <- function(elem) {
      rands <- runif(n = 2, min = -1, max = 1)
      val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
      val
    }
    piFuncVec <- function(elems) {
      message(length(elems))
      rands1 <- runif(n = length(elems), min = -1, max = 1)
      rands2 <- runif(n = length(elems), min = -1, max = 1)
      val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
      sum(val)
    }
    rdd <- parallelize(sc, 1:n, slices)
    count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
    cat("Pi is roughly", 4.0 * count / n, "\n")
    """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
pprint.pprint(r.json())

{u'id': 12,
 u'output': {u'data': {u'text/plain': u'Pi is roughly 3.136000'},
             u'execution_count': 12,
             u'status': u'ok'},
 u'state': u'running'}

Returns all the active interactive sessions.

name description type from The start index to fetch sessions int size Number of sessions to fetch int name description type from The start index of fetched sessions int total Number of sessions fetched int sessions Session list list

Creates a new interactive Scala, Python, or R shell in the cluster.

name description type kind The session kind (required) session kind proxyUser User to impersonate when starting the session string jars jars to be used in this session List of string pyFiles Python files to be used in this session List of string files files to be used in this session List of string driverMemory Amount of memory to use for the driver process string driverCores Number of cores to use for the driver process int executorMemory Amount of memory to use per executor process string executorCores Number of cores to use for each executor int numExecutors Number of executors to launch for this session int archives Archives to be used in this session List of string queue The name of the YARN queue to which submitted string name The name of this session string conf Spark configuration properties Map of key=val heartbeatTimeoutInSecond Timeout in second to which session be orphaned int

The created Session.

GET /sessions/{sessionId}

Returns the session information.

The Session.

GET /sessions/{sessionId}/state

Returns the state of session

name description type id Session id int state The current state of session string DELETE /sessions/{sessionId}

Kills the Session job.

GET /sessions/{sessionId}/log

Gets the log lines from this session.

name description type from Offset int size Max number of log lines to return int name description type id The session id int from Offset from start of log int size Number of log lines int log The log lines list of strings GET /sessions/{sessionId}/statements

Returns all the statements in a session.

name description type statements statement list list POST /sessions/{sessionId}/statements

Runs a statement in a session.

name description type code The code to execute string

The statement object.

GET /sessions/{sessionId}/statements/{statementId}

Returns a specified statement in a session.

The statement object.

POST /sessions/{sessionId}/statements/{statementId}/cancel

Cancel the specified statement in this session.

name description type msg is always "cancelled" string

Returns all the active batch sessions.

name description type from The start index to fetch sessions int size Number of sessions to fetch int name description type from The start index of fetched sessions int total Number of sessions fetched int sessions Batch list list name description type file File containing the application to execute path (required) proxyUser User to impersonate when running the job string className Application Java/Spark main class string args Command line arguments for the application list of strings jars jars to be used in this session List of string pyFiles Python files to be used in this session List of string files files to be used in this session List of string driverMemory Amount of memory to use for the driver process string driverCores Number of cores to use for the driver process int executorMemory Amount of memory to use per executor process string executorCores Number of cores to use for each executor int numExecutors Number of executors to launch for this session int archives Archives to be used in this session List of string queue The name of the YARN queue to which submitted string name The name of this session string conf Spark configuration properties Map of key=val

The created Batch object.

Returns the batch session information.

The Batch.

GET /batches/{batchId}/state

Returns the state of batch session

name description type id Batch session id int state The current state of batch session string DELETE /batches/{batchId}

Kills the Batch job.

GET /batches/{batchId}/log

Gets the log lines from this batch.

name description type from Offset int size Max number of log lines to return int name description type id The batch id int from Offset from start of log int size Number of log lines int log The log lines list of strings

A session represents an interactive shell.

name description type id The session id int appId The application id of this session String owner Remote user who submitted this session String proxyUser User to impersonate when running String kind Session kind (spark, pyspark, or sparkr) session kind log The log lines list of strings state The session state string appInfo The detailed application info Map of key=val value description not_started Session has not been started starting Session is starting idle Session is waiting for input busy Session is executing a statement shutting_down Session is shutting down error Session errored out dead Session has exited success Session is successfully stopped value description spark Interactive Scala Spark session pyspark Interactive Python 2 Spark session pyspark3 Interactive Python 3 Spark session sparkr Interactive R Spark session

To change the Python executable the session uses, Livy reads the path from environment variable PYSPARK_PYTHON (Same as pyspark).

Like pyspark, if Livy is running in local mode, just set the environment variable. If the session is running in yarn-cluster mode, please set spark.yarn.appMasterEnv.PYSPARK_PYTHON in SparkConf so the environment variable is passed to the driver.

To change the Python executable the session uses, Livy reads the path from environment variable PYSPARK3_PYTHON.

Like pyspark, if Livy is running in local mode, just set the environment variable. If the session is running in yarn-cluster mode, please set spark.yarn.appMasterEnv.PYSPARK3_PYTHON in SparkConf so the environment variable is passed to the driver.

A statement represents the result of an execution statement.

name description type id The statement id integer state The execution state statement state output The execution output statement output value description waiting Statement is enqueued but execution hasn't started running Statement is currently running available Statement has a response ready error Statement failed cancelling Statement is being cancelling cancelled Statement is cancelled name description type status Execution status string execution_count A monotonically increasing number integer data Statement output An object mapping a mime type to the result. If the mime type is application/json, the value is a JSON value. name description type id The session id int appId The application id of this session String appInfo The detailed application info Map of key=val log The log lines list of strings state The batch state string

Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4