nicta / scoobi Goto Github PK
View Code? Open in Web Editor NEWA Scala productivity framework for Hadoop.
Home Page: http://nicta.github.com/scoobi/
A Scala productivity framework for Hadoop.
Home Page: http://nicta.github.com/scoobi/
The DList
method size
requires running a map-reduce job to be run to compute the number of elements in the DList
. The implementation is effectively:
map(_ => 1).groupBy(_ => 0).combine(_+_).map(_._2)
Alternatively, Hadoop counters could be use to keep track of a DList
's size on the fly as it is being generated. Using such an approach, the size of a DList
would always be immediately available.
Hi. For some reason Scoobi seems to require a LOT of memory when compiling it. My 'sbt' script set the maximum heap to 512 MB and this led to heap-exhausted errors compiling Scoobi, even though I've compiled larger Scala apps without problem. Increasing to 1024 MB didn't help, nor did simply leaving off the -Xmx option and letting the JVM pick its own heap size. I jacked it all the way up to 4096 MB -- somewhat dicey since my laptop only has 4GB total of memory -- and this worked; the compiler seemed to top out at a bit under 2 GB.
You might want to document this since I expect others will run into the same problem. The 512 MB max heap size is the default recommended on the SBT installation page:
If a given MapReduce job output is only intermediate data (i.e. a BridgeStore
) it should be compressed. This is possible given it is currently persisted as Sequence files.
All other outputs should be stored as specified by its DataOutput
/Persister object
(i.e. they could be compressed if the Persister
specifies so).
Implement methods along the lines of fromAvroFile
and toAvroFile
. Would be good to implicitly figure out the Avro schema based on the type of the DList
.
I know it is in the roadmap, but I am really interested in trying out scoobi and this is the major blocker since most of our data is in Sequence files. I'd like to try to help if needed.
I'm not too sure why, but the reducer seems to be loading all the values into memory. The following code works with plain Java/Hadoop, but causes out-of-memory errors with Scoobi.
Expected O(1) memory use, observed O(n)
Program expects two arguments "input file" and "output directory". All the lines from the input file get sent to the same reducer, which will run out of memory if the file is large
import com.nicta.scoobi._
import com.nicta.scoobi.Scoobi._
import com.nicta.scoobi.io.text._
object Main {
def main(a: Array[String]) = withHadoopArgs(a) { args =>
val grouped: DList[(Int, Iterable[String])] = TextInput.fromTextFile(args(0)).map((1, _)).groupByKey
val r: DList[Long] = grouped.parallelDo(new DoFn[(Int, Iterable[String]), Long]() {
override def setup() {}
override def cleanup(emitter: Emitter[Long]) {}
override def process(input: (Int, Iterable[String]), emitter: Emitter[Long]) {
var count = 0L
for (c <- input._2) {
count = count + c.length
}
emitter.emit(count);
}
})
DList.persist(TextOutput.toTextFile(r, args(1)))
}
}
From the description in the docs, it appears that DList#materialize
should trigger compilation of the underlying mapreduce jobs, and persist the results as a collection accessible to the rest of the program.
But that doesn't seem to be supported yet, which is consistent with what's in README.md
:
Scoobi currently only provides one mechanism for specifying how a DList is to be persisted. It is toTextFile and is implemented in the object com.nicta.scoobi.io.text.TextOutput
Which is fine, this is still only version 0.3 or 0.4 or so :)
While it makes sense to have DList#materialize
in the API for future development, it'd be nice if were currently tagged with an annotation or something to indicate that it has yet to be implemented. As it is, it just returns a DObject
wrapping a DList
, that yields nothing when iterated over, which leads to weird bugs.
Thanks!
Scoobi currently builds agains Hadoop 0.20.xxx which includes both the "old" and "new" Hadoop APIs. Scoobi, however, uses the "old" APIs. Given the recent release of 0.23.xxx, it makes sense to move to the "new" APIs.
Extend the existing API's support for Sequence files to include automatic conversion of DList
s of Scala types (e.g. Int
, String
) to/from the Writable
equivalent (e.g. IntWritable
, Text
).
Below is an example of when DList.apply doesn't work correctly.
val as: DList[(String, Int)] = DList(
"a" -> 1,
"a" -> 2,
"a" -> 3,
"a" -> 4,
"b" -> 5,
"b" -> 6,
"c" -> 7,
"c" -> 8,
"c" -> 9,
"d" -> 10,
"d" -> 11,
"d" -> 12,
"d" -> 13,
"d" -> 14)
persist(toTextFile(as.map(identity), "out"))
Only a fraction of the inputs are actually output:
(a,1)
(a,2)
(a,3)
(a,4)
(b,5)
(b,6)
(c,7)
Note that in this particular example, it works in local mode but fails in cluster mode.
extractFromDelimitedTextFile
got changed to fromDelimitedTextFile
and its arguments got swapped also. Update the README to reflect this change.
It feels a little weird communicating through an issue tracker. I found the ScalaSyd Google group but since I'm pretty much exactly on the other side of the world, it might not be the right group to bother with purely Scoobi questions.
So maybe it is time for a Scoobi-specific mailing list or a Google group? That way I can bother you guys (some more) without filling your issue tracker with questions ;)
Regards,
Age
For people relying on maven/ivy/sbt would be great to have complied scoobi available in the sonatype maven repository.
Unit test should be their own separate sbt project, to allow ease of running tests.
This is taken from: http://groups.google.com/group/scoobi-dev/browse_thread/thread/65a06d668ff3df39
All unit tests will be in one sbt project 'tests', which will take command line arguments if only a certain unit test is to be written.
Client-side computations that can be dependent on and/or create dependencies on DList
computations
Due to the use of writeUTF, writing intermediate strings longer than 64k throw a UTFDataFormatException and fail hard. This is documented in the Java API:
http://docs.oracle.com/javase/1.4.2/docs/api/java/io/DataOutput.html#writeUTF(java.lang.String)
In large map-reduce jobs is useful to tweak the number reducers used, either to control the number of split files produced or to ensure that enough reducers are available to complete the step efficiently.
Even better would be that the optimizer figures out automatically the optimal number, but providing a hint to the optimizer is a good short-term solution.
It would be nice to not have to include a bunch of separate imports in order to do anything with Scoobi. Having a single import along the lines of Scalaz's would be nice, e.g. just requiring import com.nicta.scoobi.Scoobi._
.
Add an extractor for Float
s:
val xs: DList[Float] = fromDelimitedTextFile("in") { case _ :: Float(f) :: _ => f }
I found the package name scoobi is mis-typed to scoobij. And when I import the project to eclipse. There's lots of errors
Add constructs (primitive or library based) that allow for iterative and conditional behaviour in algorithms.
Scoobi needs to support having empty DLists. Currently an empty DList will probably result in an error like:
coobi-20120309-112117/bridges/3/ch* matches 0 files
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern file:/home/eric/.scoobi/scoobi-20120309-112117/bridges/3/ch* matches 0 files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)
A Scoobi program will crash if a file is loaded in then persisted directly without any transformation. For example:
val a = fromTextFile("hdfs://input.txt")
persist(toTextFile(a, "hdfs://output"))
This is an edge case as there are no map
, groupByKey
, etc transformations in between the load and persist.
The cause is unknown, the jar seems fine and has scala.math (and runs on Linux). The work around is to:
export HADOOP_CLASSPATH=/location/to/jar.jar
hadoop NameOfObjectToRun
Investigation the cause and possible fix
The scala.Int
object is hidden by the Scoobi.Int
(com.nicta.scoobi.TextInput.Int
). This means we can't use values such as Int.MaxValue
.
The same will apply for Float
, Double
and Long
.
Need a way around this - easiest would be to rename the Scoobi versions.
Expose the functionality of Hadoop's counters at the "language" level - e.g. a distributed "counter" or "accumulator" class type, DAcc
. Such an abstraction would be particularly useful for implementing the terminating condition of iterative algorithms.
e.g. reduce
, product
, sum
, min
, max
, length
, count
Currently we only work with Cloudera's hadoop, but we should aim to all the different hadoop distributions. It would seem that our cloudera dependency comes from the use of "mapreduce.lib.output.MultipleOutputs". This exists in apache's 0.23.0 hadoop-core, so once that version becomes more prevalent, it should be relatively easy.
It would make it slightly more convenient if DList was covariant (like a scala list)
The methods convertKeyFromSequenceFile
and convertValueFromSequenceFile
assume the Sequence File's value and key, respectively, is NullWritable
. If it's not, and you just want to load the key or value, it's a two-step process. For example:
// Get the key from a (Text, Text) sequence file
val x2: DList[(String, String)] = SequenceInput.convertFromSequenceFile(inputFile)
val x: DList[String] = x2 map (_._1)
// In 1-line
val x: DList[String] = SequenceInput.convertFromSequenceFile[(String, String)](inputFile) map (_._1)
Instead, is there a way to simply write:
// Still (Text, Text) sequence file input
val x: DList[String] = SequenceInput.convertKeyFromSequenceFile(inputFile)
It might be possible to just assume Writable
as opposed to NullWritable
...
Similar to sequence files, scoobi should support avro files
When multiple apply generated DLists are used in the same mapreduce job, they are reading from the wrong data source.
val a = DList(1, 2, 3)
val b = DList(4, 5, 6)
val c = a ++ b
c
should contain the values 1 to 6
, but instead is the values of a
, twice.
Just like we generate a wireformat, we need a convenient way to generate total ordering for a scala case class. Unless there's some existing scala solution, I'll implement it like mkCaseWireFormat
Today I tried applying the new Join operations on my data but I ran into StackOverflowErrors every time.
I have two DLists. The first contains a mapping of low-level id to higher-level id, resulting from an earlier processing step. the second one contains a mapping of low-level id to a PageView instance, which is my raw data. I would like to create a join to get a DList containing (higher-level id, Pageview) pairs so I can later group by higher level ids, combine the pageviews, and do my final analysis. See the code below for an impression.
When I run this (with standalone hadoop), I get a stack overflow (see stacktrace below). I looked at the code for both Join and CoGroup and I narrowed it down to the following code:
val joined = (d1s ++ d2s).groupByKey ...
The groupByKey
causes the stack overflow, although I have no idea why. Since this code is shared between Join and CoGroup, both operations result in the stack overflow.
Unfortunately, this is where my insight in how Scoobi works ends (at the moment). I hope you guys can help because I really don't want to go back to plain Hadoop or Cascading.
Regards,
Age
...
val visitorIdToTrueVisitorId: DList[(Long, Long)] = ...
val visitorIdToPageView: DList[(Long, PageView)] = ...
val joined = join(visitorIdToTrueVisitorId, visitorIdToPageViews)
...
Exception in thread "main" java.lang.StackOverflowError
at com.nicta.scoobi.impl.plan.AST$GroupByKey.gd7$1(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.equals(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$Combiner.gd3$1(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$Combiner.equals(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.gd2$1(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.equals(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.gd7$1(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.equals(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$Combiner.gd3$1(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$Combiner.equals(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.gd2$1(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.equals(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.gd7$1(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.equals(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$Combiner.gd3$1(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$Combiner.equals(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.gd2$1(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.equals(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.gd7$1(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.equals(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$Combiner.gd3$1(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$Combiner.equals(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.gd2$1(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GbkMapper.equals(AST.scala:73)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.gd7$1(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$GroupByKey.equals(AST.scala:175)
at com.nicta.scoobi.impl.plan.AST$Combiner.gd3$1(AST.scala:92)
at com.nicta.scoobi.impl.plan.AST$Combiner.equals(AST.scala:92)
at scala.collection.immutable.Set$Set1.contains(Set.scala:76)
at com.nicta.scoobi.impl.plan.MSCR$$anonfun$containingOutput$1.apply(MSCR.scala:90)
at com.nicta.scoobi.impl.plan.MSCR$$anonfun$containingOutput$1.apply(MSCR.scala:90)
at scala.collection.Iterator$class.find(Iterator.scala:709)
at scala.collection.immutable.TrieIterator.find(TrieIterator.scala:20)
at scala.collection.IterableLike$class.find(IterableLike.scala:80)
at scala.collection.immutable.HashSet.find(HashSet.scala:33)
at com.nicta.scoobi.impl.plan.MSCR$.containingOutput(MSCR.scala:90)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$com$nicta$scoobi$impl$exec$Executor$$executeMSCR$1.apply(Executor.scala:112)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$com$nicta$scoobi$impl$exec$Executor$$executeMSCR$1.apply(Executor.scala:110)
at scala.collection.immutable.Set$Set2.foreach(Set.scala:106)
at com.nicta.scoobi.impl.exec.Executor$.com$nicta$scoobi$impl$exec$Executor$$executeMSCR(Executor.scala:110)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$com$nicta$scoobi$impl$exec$Executor$$executeMSCR$1.apply(Executor.scala:112)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$com$nicta$scoobi$impl$exec$Executor$$executeMSCR$1.apply(Executor.scala:110)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
at com.nicta.scoobi.impl.exec.Executor$.com$nicta$scoobi$impl$exec$Executor$$executeMSCR(Executor.scala:110)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$com$nicta$scoobi$impl$exec$Executor$$executeMSCR$1.apply(Executor.scala:112)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$com$nicta$scoobi$impl$exec$Executor$$executeMSCR$1.apply(Executor.scala:110)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
at com.nicta.scoobi.impl.exec.Executor$.com$nicta$scoobi$impl$exec$Executor$$executeMSCR(Executor.scala:110)
...
Those last four lines repeat until the stack overflows.
Currently in impl/ClassBuilder.scala there are a lot of calls to StringBuilder::append which looks really ugly and makes it harder to see what's happening. This can almost certainly be improved in Scala, perhaps by importing append as an operator-like function
Based from initial comment: #4 (comment)
Marking as Minor, as it's an aesthetic implementation detail
Using ScalaCheck to build and test random ASTs
Scoobi has Combine
as a primitive node type in its logical graphs/plans. The advantage of "combining" is to increase local aggregation so as to reduce disk and network I/O. It also helps solve the reduce straggler problem - that is, the reducers that get a highly-distribued skew of values for a particular key (e.g. "the" in word-count). Scoobi implements the Combine
node as a combination of the Hadoop Combiner and Hadoop Reducer classes. The disadvantage of Hadoop's "combiner" feature is that it is provided only as a hint to the framework and may not necessarily run.
Instead, combining can be move inside the "mapper" by using a Map container per map task. In-mapper combining can also further improve on Hadoop combining - the output of the map doesn't need to be written to the local disk, serialised, etc. Experiment with different techniques for determining when the Map becomes too large and needs to be flushed.
I cloned master today (currently at 0407fe2) and tried to compile it, but after chugging way for about 10 minutes, sbt 0.11.0 failed with an OutOfMemoryError.
Here is my sbt command:
java -Xmx1024m -XX:MaxPermSize=256m -jar `dirname $0`/sbt-launch-0.11.0.jar "$@"
As far as I know I don't have any suspicious sbt plugins that would cause this.
Command line transcript is here: https://gist.github.com/1872753
Full sbt log is here: https://gist.github.com/1872775
Scoobi does not currently employ RawComparators
for the K2 type (TaggedKey
). Doing so has the advantage of much faster key comparisons as data does not need to be first deserialised.
Investigate and implement a mechanism to provide a RawComparator
for the K2 type generated for each MapReduce job that fits within the the existing type class approach (e.g. WireFormat
and Grouping
). Aim to achieve close to hand-coded performance for the majority of cases and fall-back to the compareTo approach otherwise.
Haven't spent much time on it, so I don't have a minimal example -- but I do have a reproducible one:
(Run it with no args in local mode)
If line 54 is commented out, it works fine. If line 55 is commented out, it works fine. But with both lines, it fails.
DList[String]() ++ fromTextFile(args(0))
Always results in a failure. I suspect this is quite related to Issue #60
It would be nice to easily include other computations within a Scoobi pipeline. For example:
The key ideas are:
DList
interface;This would in general be a useful feature for pulling in legacy or 3rd-party code - Hadoop or otherwise. Key tasks are:
DList
interface:
DList
sDList
and DObject
inputs and outputs;Scoobi sends temp output to the working directory, then copies it over to the final destination.
This works find if you're outputting to hdfs (even if you're input is from a non-hdfs:// source)
If you attempt to write to s3, you get this error:
java.lang.IllegalArgumentException: Wrong FS: s3n://mr-foursquarelogs/schemas/raw/dt=2012-01-01/sanity, expected: hdfs://datamining-master-1
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:410)
at org.apache.hadoop.hdfs.DistributedFileSystem.checkPath(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:162)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:358)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1275)
at com.nicta.scoobi.impl.exec.MapReduceJob$$anonfun$run$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$8.apply(MapReduceJob.scala:260)
at com.nicta.scoobi.impl.exec.MapReduceJob$$anonfun$run$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$8.apply(MapReduceJob.scala:259)
I'm not sure the output path is available at the point where the working directory is set. (Conf.scala, line 70)
Setting scoobi.workdir to an s3 path doesn't fix the issue due to the way the FileSystem is instantiated in Scoobi.scala (without the URI). Setting this would be the simple fix, but require that you always set scoobi.workdir
Ideally the temp output directory would be in a subdirectory of the output, maybe output/_tmp/output.
I'll play with this over the next couple of days and come up with something, thoughts?
It would be useful to be able to concatenate several different hdfs directories into the same DList
Your WordCount example is broken because it attempts to create a file on the local filesystem and then pass it to Scoobi, which will naturally use Hadoop's filesystem API (which may or may not refer to the local filesystem). This makes the example break by default on the TACC cluster I'm using, since its default Hadoop FS is HDFS. Now, it happens on my cluster that my home directory is mounted on all the machines, so if I use the option '-fs file:///', everything works. But that's not a general solution for multiple reasons:
(1) There's no guarantee all clusters will be set up with the local filesystem visible on all machines.
(2) There might be good reasons for using HDFS -- e.g. say we were trying to do something like what WordCount does, but with really large files.
Although I haven't looked too closely at the guts of Scoobi yet, I don't see any way to do the "right thing" here -- which is to use the Hadoop API for creating files. The problem is that Scoobi doesn't seem to provide any way of getting a Configuration object so that a Hadoop FileSystem object can be retrieved. Either withHadoopArgs() needs to be changed so that it passes both a list of arguments and a Configuration object (or similar), or another, related function needs to be created, e.g. withHadoopArgsAndConf(). I'd vote for the former for simplicity.
ben
Hi
I've tried to run both my own sample code and the WordCount example against a running Hadoop cluster (a pseudo distributed one running on my local machine) but I keep running into a NullPointerException during the shuffle phase.
Setup:
Cloudera CDH3u2 (hadoop-0.20.2-cdh3u2) running in local pseudo-distributed mode on OSX 10.7.2 using jsk 1.6.0_29
I ran into some issues with the default arguments to WordCount being interpreted as hdfs paths during persist so I uploaded the generated all-words.txt to hdfs and ran it with both arguments as hdfs paths
Output:
$>: hadoop com.nicta.scoobi.examples.WordCount /in/all-words.txt /tmp/out
2011-11-12 13:46:59.740 java[15347:1903] Unable to load realm info from SCDynamicStore
11/11/12 13:47:04 INFO mapred.FileInputFormat: Total input paths to process : 1
11/11/12 13:47:05 INFO mapred.JobClient: Running job: job_201111121331_0001
11/11/12 13:47:06 INFO mapred.JobClient: map 0% reduce 0%
11/11/12 13:47:13 INFO mapred.JobClient: map 100% reduce 0%
11/11/12 13:47:21 INFO mapred.JobClient: Task Id : attempt_201111121331_0001_r_000000_0, Status : FAILED
Error: java.lang.NullPointerException
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1603)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1468)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1317)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1249)
attempt_201111121331_0001_r_000000_0: 2011-11-12 13:47:12.549 java[15432:1903] Unable to load realm info from SCDynamicStore
11/11/12 13:47:29 INFO mapred.JobClient: Task Id : attempt_201111121331_0001_r_000000_1, Status : FAILED
Error: java.lang.NullPointerException
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1603)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1468)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1317)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1249)
attempt_201111121331_0001_r_000000_1: 2011-11-12 13:47:21.072 java[15460:1903] Unable to load realm info from SCDynamicStore
11/11/12 13:47:36 INFO mapred.JobClient: Task Id : attempt_201111121331_0001_r_000000_2, Status : FAILED
Error: java.lang.NullPointerException
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1603)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1468)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1317)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1249)
attempt_201111121331_0001_r_000000_2: 2011-11-12 13:47:29.041 java[15489:1903] Unable to load realm info from SCDynamicStore
11/11/12 13:47:46 INFO mapred.JobClient: Job complete: job_201111121331_0001
11/11/12 13:47:46 INFO mapred.JobClient: Counters: 18
11/11/12 13:47:46 INFO mapred.JobClient: Job Counters
11/11/12 13:47:46 INFO mapred.JobClient: Launched reduce tasks=4
11/11/12 13:47:46 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=9815
11/11/12 13:47:46 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
11/11/12 13:47:46 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
11/11/12 13:47:46 INFO mapred.JobClient: Launched map tasks=2
11/11/12 13:47:46 INFO mapred.JobClient: Data-local map tasks=2
11/11/12 13:47:46 INFO mapred.JobClient: Failed reduce tasks=1
11/11/12 13:47:46 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=4946
11/11/12 13:47:46 INFO mapred.JobClient: FileSystemCounters
11/11/12 13:47:46 INFO mapred.JobClient: HDFS_BYTES_READ=58793
11/11/12 13:47:46 INFO mapred.JobClient: FILE_BYTES_WRITTEN=125291
11/11/12 13:47:46 INFO mapred.JobClient: Map-Reduce Framework
11/11/12 13:47:46 INFO mapred.JobClient: Combine output records=995
11/11/12 13:47:46 INFO mapred.JobClient: Map input records=5000
11/11/12 13:47:46 INFO mapred.JobClient: Spilled Records=995
11/11/12 13:47:46 INFO mapred.JobClient: Map output bytes=95000
11/11/12 13:47:46 INFO mapred.JobClient: Map input bytes=30000
11/11/12 13:47:46 INFO mapred.JobClient: Combine input records=5000
11/11/12 13:47:46 INFO mapred.JobClient: Map output records=5000
11/11/12 13:47:46 INFO mapred.JobClient: SPLIT_RAW_BYTES=394
11/11/12 13:47:46 INFO mapred.JobClient: Job Failed: NA
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1246)
at com.nicta.scoobi.impl.exec.MapReduceJob.run(MapReduceJob.scala:191)
at com.nicta.scoobi.impl.exec.Executor$.com$nicta$scoobi$impl$exec$Executor$$executeMSCR(Executor.scala:117)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$executePlan$10.apply(Executor.scala:101)
at com.nicta.scoobi.impl.exec.Executor$$anonfun$executePlan$10.apply(Executor.scala:99)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
at com.nicta.scoobi.impl.exec.Executor$.executePlan(Executor.scala:99)
at com.nicta.scoobi.DList$.persist(DList.scala:133)
at com.nicta.scoobi.examples.WordCount$$anonfun$main$1.apply(WordCount.scala:62)
at com.nicta.scoobi.examples.WordCount$$anonfun$main$1.apply(WordCount.scala:26)
at com.nicta.scoobi.Scoobi$.withHadoopArgs(Scoobi.scala:44)
at com.nicta.scoobi.examples.WordCount$.main(WordCount.scala:26)
at com.nicta.scoobi.examples.WordCount.main(WordCount.scala)
Scoobi 0.4 will need to depend on a non-snapshot version of specs2 to avoid users having to add/have specs2. When compiling a project that depends on scoobi, I'm getting org.specs2#specs2_2.9.1;1.10-SNAPSHOT: not found
If specs2 isn't ready in time, we could publish a pseudo release (1.09.1 ?) like we did for scalaz
I ran into some exceptions while running the word count example. After some headdesking, I figured out that I needed the Cloudera distribution (I never clicked the link in the readme).
A minor change to the readme might save others from confusion.
Cheers.
Similar to Scala's App
trait, it would be nice to have something similar for Scoobi. This would make it even easier to write small applications.
TaggedKey
and TaggedValue
are used to wrap multiple key and value types within a single type in order to support MSCR fusion. However, their implementations still incurs the overhead of tagging when only a single type is being wrapped.
Because TaggedKey
and TaggedValue
sub-classes are generated at run time, it should be possible to modify the code generation to include a special case for the situation when only a single type is being wrapped.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.