Git Product home page Git Product logo

spark-indexedrdd's Introduction

IndexedRDD for Apache Spark

An efficient updatable key-value store for Apache Spark.

IndexedRDD extends RDD[(K, V)] by enforcing key uniqueness and pre-indexing the entries for efficient joins and point lookups, updates, and deletions. It is implemented by (1) hash-partitioning the entries by key, (2) maintaining a radix tree (PART) index within each partition, and (3) using this immutable and efficiently updatable data structure to enable efficient modifications and deletions.

Usage

Add the dependency to your SBT project by adding the following to build.sbt (see the Spark Packages listing for spark-submit and Maven instructions):

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"

libraryDependencies += "amplab" % "spark-indexedrdd" % "0.3"

Then use IndexedRDD as follows:

import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._

// Create an RDD of key-value pairs with Long keys.
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0)))
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing
// the entries.
val indexed = IndexedRDD(rdd).cache()

// Perform a point update.
val indexed2 = indexed.put(1234L, 10873).cache()
// Perform a point lookup. Note that the original IndexedRDD remains
// unmodified.
indexed2.get(1234L) // => Some(10873)
indexed.get(1234L) // => Some(0)

// Efficiently join derived IndexedRDD with original.
val indexed3 = indexed.innerJoin(indexed2) { (id, a, b) => b }.filter(_._2 != 0)
indexed3.collect // => Array((1234L, 10873))

// Perform insertions and deletions.
val indexed4 = indexed2.put(-100L, 111).delete(Array(998L, 999L)).cache()
indexed2.get(-100L) // => None
indexed4.get(-100L) // => Some(111)
indexed2.get(999L) // => Some(0)
indexed4.get(999L) // => None

spark-indexedrdd's People

Contributors

aalexandrov avatar ankurdave avatar brkyvz avatar jeremyrsmith avatar jthelin avatar swkimme avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-indexedrdd's Issues

Compilation error due to Scala version in part_2.10-0.1.jar

Platform: the latest Spark (spark-core_2.11) which is compatible with Scala 2.11.
When using the following maven dependency:

<dependency>
<groupId>amplab</groupId>
<artifactId>spark-indexedrdd</artifactId>
<version>0.4.0</version>
</dependency>

We get the following compilation error -
part_2.10-0.1.jar of MyProject build path is cross-compiled with an incompatible version of Scala (2.10.0). Unknown Scala Version Problem

Spark Streaming usage

Hi,

So I was wondering if this valid to say use in Spark Streaming to maintain a global state lookup table in a batch, that can be updated in a Streaming Batch and refreshed periodically.

Help greatly appreciated,
Arty

Checkpointing cannot cut lineage on IndexedRDD

Hi,

When doing iterative computing, I found the lineage on IndexedRDDs cannot be cut through checkpointing. It is because the newRDD returned by doCheckpoint() is a CheckpointRDD.

I'm presently using IndexedRDD as a Spark package. Is there any way other than checkpoint (through public API) that I can use to cut the lineage? Thank you! Any suggestion is appreciated...

To construct indexedRDD in Java

Hi,
I try to create an simple indexedRDD in java.
Here is my code:

List data = Arrays.asList(1, 2, 3, 4, 5);
JavaPairRDD<Long, Integer> distData = sc.parallelize(data).mapToPair(v -> new Tuple2(v.longValue(),0));
IndexedRDD<Long, Integer> mytest = new IndexedRDD<Long, Integer>(distData);

It has some errors, is there anyone help?

IndexedRDD.join slower than vanilla PairRDD.join

I am testing out IndexedRDD and noticing some performance problems that I wouldn't expect based on the README and what I saw from your SparkSummit presentation. The use case I have for IndexedRDD is to use it for doing near realtime denormalization for a data warehouse using spark streaming. My hope was that I could join a small subset of data (the most recent group of changed records) against an IndexedRDD in hopes that I could avoid having to do a full scan of the RDD and process the records much faster.

I have tried testing this out using both real and some generated data and found that when doing an inner join with the small dataset (about 100 records) on the left hand side of the join and a large dataset (100,000,000 records) on the right hand side of the join, the vanilla spark RDD performs as fast or faster than the IndexedRDD, even when caching both datasets and ensuring they share a partitioner beforehand to avoid the cost of the repartition.

Of the few times I ran this test using generated data (code follows) the IndexedRDD implementation was 15-20% slower. Digging into the code, it looks like it won't do any actual pruning of partitions to scan and instead will zip all from both sides, even if some partitions on one side of the join are empty. I know that by using the PartitionPruningRDD you should be able to inform the scheduler that only a subset of partitions need to be processed, but I am curious if I am just misunderstanding some details and applying the wrong tool for the job.

As mentioned, here is the code that I used to generate the results:

import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._
import org.apache.spark.HashPartitioner
import java.util.Random

class RecordBuilder extends Serializable {
  val r = new Random
  def buildStringLen(len: Int) : String = {
    (0 to len).map((i) => r.nextInt().toChar).mkString
  }
  def buildRecord(id: Long) : (Long, Map[String, String]) = {
    (id, Map(
      "id" -> id.toString,
      "rand1" -> buildStringLen(20),
      "rand2" -> buildStringLen(30),
      "rand3" -> buildStringLen(40)
      )
     )
  }
}
val rb = new RecordBuilder

val myHashPart = new HashPartitioner(200)
val r = new Random


val rhs = sc.parallelize(0L to (100000000 / 1000))
  .flatMap((i) => ((i * 1000) until ((i * 1000) + 1000)))
  .map((i) => rb.buildRecord(i))
  .partitionBy(myHashPart)
  .cache()

val lhs = sc.parallelize((0 to 100)
                         .map((i) => rb.buildRecord(r.nextLong() % 100000000)))
                         .partitionBy(myHashPart)
                         .cache()
rhs.count()
lhs.count()
lhs.join(rhs).collect()

val lhsi = IndexedRDD(lhs).cache()
val rhsi = IndexedRDD(rhs).cache()
lhsi.count()
rhsi.count()

lhsi.join(rhsi)((k, v1, v2) => v1).collect()

I may try adding in the PartitionPruningRDD and see what perf this gives me but would love to get some feedback on my experiment.

build problem, classes are unknown

I have problems including indexedrdd into a project and compile it. While compiling I get errors like

[info] Compiling 4 Scala sources to C:\temp\boldt-spark\target\scala-2.10\test-c
lasses...
[error] C:\temp\boldt-spark\src\test\scala\edu\berkeley\cs\amplab\spark\indexedrdd\KeySerializerSuite.scala:53: not found: type Tuple2Serializer
[error] val ser = new Tuple2SerializerA, B(aSer, bSer)
[error] ^
[error] one error found
error Compilation failed
[error] Total time: 5 s, completed 22-09-2015 15:53:52

All classes seems to be invisible even that the jar(spark-indexedrdd) is on the classpath

I created a fork and removed the sources and instead inserted a dependency on the spark-indexedrdd jar, so basically you should be able to recreate the error by running "sbt test" on the following #10

Can IndexedRDD work with Spark Streaming

Hi Ankur,

We hope to improve the join operations in Spark Streaming. Do you think IndexedRDD can work with Spark Streaming and may reduce the latency?

Thanks,
Rong

Contributions

Hi, I'm just wondering whether this repo is dead or whether you are taking any contributions.

it doesn't work on spark 2.2.0 with scala 2.11.8

Hi,
I just simply tried it in spark-shell 2.2.0 with scala 2.11.8. the shell was started with involving two jars of spark-indexedrdd-0.3.jar and part_2.10-0.1.jar. And I pasted your example in it and got some error messages as following:

====================================================================
scala> import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD

scala> import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._

scala>

scala> // Create an RDD of key-value pairs with Long keys.

scala> val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0)))
rdd: org.apache.spark.rdd.RDD[(Long, Int)] = ParallelCollectionRDD[0] at parallelize at :28

scala> // Construct an IndexedRDD from the pairs, hash-partitioning and indexing

scala> // the entries.

scala> val indexed = IndexedRDD(rdd).cache()
indexed: edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD[Long,Int] = IndexedRDD[3] at RDD at IndexedRDD.scala:37

scala>

scala> // Perform a point update.

scala> val indexed2 = indexed.put(1234L, 10873).cache()
17/07/16 16:51:26 WARN util.ClosureCleaner: Expected a closure; got edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$MultiputZipper
indexed2: edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD[Long,Int] = IndexedRDD[7] at RDD at IndexedRDD.scala:37

scala> // Perform a point lookup. Note that the original IndexedRDD remains

scala> // unmodified.

scala> indexed2.get(1234L) // => Some(10873)
java.lang.NoSuchMethodError: org.apache.spark.SparkContext.runJob(Lorg/apache/spark/rdd/RDD;Lscala/Function2;Lscala/collection/Seq;ZLscala/reflect/ClassTag;)Ljava/lang/Object;
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.multiget(IndexedRDD.scala:83)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.get(IndexedRDD.scala:76)
... 50 elided

scala> indexed.get(1234L) // => Some(0)
java.lang.NoSuchMethodError: org.apache.spark.SparkContext.runJob(Lorg/apache/spark/rdd/RDD;Lscala/Function2;Lscala/collection/Seq;ZLscala/reflect/ClassTag;)Ljava/lang/Object;
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.multiget(IndexedRDD.scala:83)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.get(IndexedRDD.scala:76)
... 50 elided

scala>

scala> // Efficiently join derived IndexedRDD with original.

scala> val indexed3 = indexed.innerJoin(indexed2) { (id, a, b) => b }.filter(_._2 != 0)
17/07/16 16:51:28 WARN util.ClosureCleaner: Expected a closure; got edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$InnerJoinZipper
indexed3: edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD[Long,Int] = IndexedRDD[11] at RDD at IndexedRDD.scala:37

scala> indexed3.collect // => Array((1234L, 10873))
[Stage 1:> (0 + 0) / 2]17/07/16 16:51:29 WARN scheduler.TaskSetManager: Stage 0 contains a task of very large size (9770 KB). The maximum recommended task size is 100 KB.
[Stage 0:=========> (1 + 1) / 2][Stage 1:> (0 + 2) / 2]17/07/16 16:51:32 WARN storage.BlockManager: Putting block rdd_2_0 failed due to an exception
17/07/16 16:51:32 WARN storage.BlockManager: Putting block rdd_2_1 failed due to an exception
17/07/16 16:51:32 WARN storage.BlockManager: Block rdd_2_0 could not be removed as it was not found on disk or in memory
17/07/16 16:51:32 WARN storage.BlockManager: Block rdd_2_1 could not be removed as it was not found on disk or in memory
17/07/16 16:51:32 ERROR executor.Executor: Exception in task 1.0 in stage 2.0 (TID 5)
java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.compute(IndexedRDD.scala:72)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 41 more
17/07/16 16:51:32 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 4)
java.lang.NoClassDefFoundError: edu/berkeley/cs/amplab/spark/indexedrdd/impl/PARTPartition
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.compute(IndexedRDD.scala:72)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/07/16 16:51:32 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4, localhost, executor driver): java.lang.NoClassDefFoundError: edu/berkeley/cs/amplab/spark/indexedrdd/impl/PARTPartition
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.compute(IndexedRDD.scala:72)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

17/07/16 16:51:32 ERROR scheduler.TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
17/07/16 16:51:32 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 5, localhost, executor driver): java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.compute(IndexedRDD.scala:72)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 41 more

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 4, localhost, executor driver): java.lang.NoClassDefFoundError: edu/berkeley/cs/amplab/spark/indexedrdd/impl/PARTPartition
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.compute(IndexedRDD.scala:72)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
... 50 elided
Caused by: java.lang.NoClassDefFoundError: edu/berkeley/cs/amplab/spark/indexedrdd/impl/PARTPartition
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$$anonfun$5.apply(IndexedRDD.scala:422)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.compute(IndexedRDD.scala:72)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

scala>

scala> // Perform insertions and deletions.

scala> val indexed4 = indexed2.put(-100L, 111).delete(Array(998L, 999L)).cache()
17/07/16 16:51:33 WARN util.ClosureCleaner: Expected a closure; got edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$MultiputZipper
17/07/16 16:51:33 WARN util.ClosureCleaner: Expected a closure; got edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$DeleteZipper
indexed4: edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD[Long,Int] = IndexedRDD[19] at RDD at IndexedRDD.scala:37

scala> indexed2.get(-100L) // => None
java.lang.NoSuchMethodError: org.apache.spark.SparkContext.runJob(Lorg/apache/spark/rdd/RDD;Lscala/Function2;Lscala/collection/Seq;ZLscala/reflect/ClassTag;)Ljava/lang/Object;
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.multiget(IndexedRDD.scala:83)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.get(IndexedRDD.scala:76)
... 50 elided

scala> indexed4.get(-100L) // => Some(111)
java.lang.NoSuchMethodError: org.apache.spark.SparkContext.runJob(Lorg/apache/spark/rdd/RDD;Lscala/Function2;Lscala/collection/Seq;ZLscala/reflect/ClassTag;)Ljava/lang/Object;
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.multiget(IndexedRDD.scala:83)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.get(IndexedRDD.scala:76)
... 50 elided

scala> indexed2.get(999L) // => Some(0)
java.lang.NoSuchMethodError: org.apache.spark.SparkContext.runJob(Lorg/apache/spark/rdd/RDD;Lscala/Function2;Lscala/collection/Seq;ZLscala/reflect/ClassTag;)Ljava/lang/Object;
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.multiget(IndexedRDD.scala:83)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.get(IndexedRDD.scala:76)
... 50 elided

scala> indexed4.get(999L) // => None
java.lang.NoSuchMethodError: org.apache.spark.SparkContext.runJob(Lorg/apache/spark/rdd/RDD;Lscala/Function2;Lscala/collection/Seq;ZLscala/reflect/ClassTag;)Ljava/lang/Object;
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.multiget(IndexedRDD.scala:83)
at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.get(IndexedRDD.scala:76)
... 50 elided

should I use earlier version or do something else?
thanks

Henry

Expected a closure; got ... IndexedRDD$MultiputZipper

While running your example from the README on a recent Spark (2.0-SNAPSHOT) compiled for 2.11.7, I'm seeing the following exception:

16/03/01 19:07:45 WARN ClosureCleaner: Expected a closure; got edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD$MultiputZipper
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.SparkContext.runJob(Lorg/apache/spark/rdd/RDD;Lscala/Function2;Lscala/collection/Seq;ZLscala/reflect/ClassTag;)Ljava/lang/Object;
    at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.multiget(IndexedRDD.scala:83)
    at edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD.get(IndexedRDD.scala:76)

I've compiled your package and its dependency (PART) setting scalaVersion to 2.11.7 and published it to the local Ivy repo. Do you think the problem I'm experiencing is related to Scala or Spark version I'm using (I'd like to stay on Scala 2.11 for my project)? If so, what would your roadmap look like for releasing the Scala 2.11 version of your package (see issue #12)? Thanks

Multiget operation is even slower than multiput opertation by IndexeddRDD

We have done some testings with IndexeddRDD. It performs well when updating or deleting. However, when we looked up one key-value, it was even slower than updating this key-value. Finally, we addressed this problem on function "runJob" in "multiget". Now we are struggling to fix this problem by zipPartitions.

Is there a python interface for indexedrdd?

Hi there, I tried searching around but couldn't find anything. Just wanted to double check with you in case I overlooked something ... do you guys have a pyspark interface for this indexedrdd project?

One-to-multiple lookup

Hi, Ankur,

IndexedRDD works prefect for one-to-one Key-Value pairs. But for the case that there are multiple values to a key, it only returns one value, which is not as the Seq(Value) returned by lookup method of RDD.

Since IndexedRDD extends from RDD, I guess perhaps the lookup method is worth overriding? Or its one-to-multiple lookup performance will be actually the same with the one for RDD partitioned by HashPartitioner? Thanks for your patience.

Best regards,
Li

dependency com.ankurdave#part_2.10;0.1:

Hi All,

When I added indexedrdd dependency in the sbt configuration file and compiled my scala, it keeps giving me this error:

error sbt.ResolveException: unresolved dependency: com.ankurdave#part_2.10;0.1: not found
[error] Total time: 6 s, completed Sep 9, 2015 11:01:37 PM

What does this mean? Any ideas?
Thank you!
Hong

Publish to repo1?

Could you publish this to the Maven Central Repository? My gradle file looks like:

repositories {
    mavenCentral()
    maven {
        url "http://dl.bintray.com/spark-packages/maven"
    }
    maven {
        url "https://raw.githubusercontent.com/ankurdave/maven-repo/master"
    }
}

Comparisons with KV stores (Redis and Memcached)

Hi Ankur,

We are hitting runtime issues in using SparkSQL as a distributed kv store through jdbc API. From our schema we can define a composite key and would like to run multiget on key which can be exposed through akka based REST API.

IndexedRDD was written for such use-cases and I would like to use it because our models are ML based.

It will be great if some comparisons with IndexedRDD with popular options like Redis/Memcached can be shown. I saw in your summit talk you showed cassandra comparisons but in our use-case the persistance is guaranteed by batch spark jobs in hive metastore and we need a KV store with fast read speed and modest write speed.

Something like Redis is a good comparison to see since it supports fast read and modest writes. I would like to know your feedbacks before I start setting up the Spark SQL as KV store, IndexedRDD and Redis comparisons.

Thanks.
Deb

NoClassDefFoundError: org/apache/spark/Logging

After packaging to .jar file, I add it to intelliJ IDEA as library. And I get this error.
"java.lang.NoClassDefFoundError: org/apache/spark/Logging", because of the "Logging" class is "private[spark]".

How could I use IndexedRDD in ide?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.