Git Product home page Git Product logo

spark-mongodb's Introduction

Spark-Mongodb

Join the chat at https://gitter.im/Stratio/Spark-MongoDB

Spark-Mongodb is a library that allows the user to read/write data with Spark SQL from/into MongoDB collections.

If you are using this Data Source, feel free to briefly share your experience by Pull Request this file.

Requirements##

This library requires Apache Spark, Scala 2.10 or Scala 2.11, Casbah 2.8.X

Latest compatible versions####

spark-MongoDB Apache Spark MongoDB
0.12.x 2.0.0 3.0.x
0.10.x - 0.11.x 1.5.x 3.0.x
0.8.2 - 0.9.2 1.4.0 3.0.x
0.8.1 1.3.0 3.0.x
0.8.0 1.2.1 3.0.x

How to use Spark-MongoDB##

There also exists a [First Steps] (https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst) document where we show some simple examples.

License

Licensed to STRATIO (C) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The STRATIO (C) licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

spark-mongodb's People

Contributors

abandin-stratio avatar albertoperezsanz avatar ccaballe avatar compae avatar darroyo-stratio avatar giaosudau avatar gitter-badger avatar hdominguez-stratio avatar hrodriguez-stratio avatar jsantosp avatar kanielc avatar koertkuipers avatar pfcoperez avatar pmadrigal avatar robertomorandeira avatar sfines avatar socialpercon avatar stratioadmin avatar witokondoria avatar wuciawe avatar yucheng1992 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-mongodb's Issues

artifact name should include scala binary version postfix

i tried to use the new maven build with -Pscala-2.11 and it works well. nice!

however the produced artifact does not include the scala version postfix. artifacts should be spark-mongodb-core_2.10 and spark-mongodb-core_2.11, so that people can use it safely with different scala versions.

Schema discovery created unpersistable schema for empty arrays

If all observed values of a document field are [] the generated schema is for ArrayType[NullType] which cannot be persisted or used in any meaningful way.

In the absence of evidence of the type of array elements a more logical behavior would be to allow for overrides of the schema of a subset of fields, e.g., as a JSON string (schema.json in Spark) or, if a default behavior is needed, map to ArrayType[StringType] as opposed to ArrayType[NullType]. The benefits are that this mapping can be persisted and it can represent any Mongo arrays, including heterogeneous ones.

执行后不自动 结束 mongodb

mgDF = sqlContext.read.format("com.stratio.datasource.mongodb").options(
credentials="xxxxxxxxx",
collection="xxxxx",
host="xxxxxxx",
database="xxxxxxx").load()
mgDF.registerTempTable("mgDF")

执行后 任务不结束 数据 已经打印完了 不自动 结束

version 0.8.7, load from mongodb is not working with spark 1.5.2 and 1.5.1

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 131.0 failed 1 times, most recent failure: Lost task 0.0 in stage 131.0 (TID 281, localhost): java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$.findTightestCommonType()Lscala/Function2;
[info] at com.stratio.provider.mongodb.schema.MongodbSchema.com$stratio$provider$mongodb$schema$MongodbSchema$$compatibleType(MongodbSchema.scala:85)
[info] at com.stratio.provider.mongodb.schema.MongodbSchema$$anonfun$4.apply(MongodbSchema.scala:49)
[info] at com.stratio.provider.mongodb.schema.MongodbSchema$$anonfun$4.apply(MongodbSchema.scala:49)
[info] at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:207)
[info] at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:206)
[info] at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
[info] at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
[info] at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:212)
[info] at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
[info] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
[info] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
[info] at org.apache.spark.scheduler.Task.run(Task.scala:88)
[info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info] at java.lang.Thread.run(Thread.java:745)

Possibility to run geo queries and/or setting a query built externally

I am in the situation that I have to run million different geoqueries ($near:{$geometry:{type:'Geometry', coordinates[lon,lat]}, $maxDistance:...} etc) changing the coordinates for each of the query.

I am not using DataFrames but RDDs directly with com.stratio.datasource.mongodb.rdd.MongodbRDD.

I assume that in order to run geoqueries both for DataFrame and RDD is necessary to change the method queryPartition in the class com.stratio.datasource.mongodb.reader.MongodbReader right?

Config parameters reading from Mongodb

Hi team,

I am using Java + Spark JobServer + EMR.

have tried both ways. Normal code and sql to read from a collection in mongodb.

Part 1:
DataFrame dataFrame = sqlContext.sql("CREATE TEMPORARY TABLE " + tableName +
" (emailId STRING, contactId INT, deleted BOOLEAN)" +
" USING com.stratio.datasource.mongodb OPTIONS " +
" (" +
" host x.x.x.x:27017, " +
" database xdb, " +
" credentials user,database,password;xuser,xdb,xpassword, " +
" collection theCollectionBeingRead)");

This throws an

exception.java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier CREATE found

Part 2:

    DataFrame dataFrame = sqlContext.read()
            .format("com.stratio.datasource.mongodb")
            .option("host", "x.x.x.x:27017")
            .option("database", "xdb").
                    option("credentials", "user,database,password;xuser,xdb,xpassword")
            .option("collection", inputSource.getTableName())
            .load().persist();

This is getting stuck at

[2015-12-27 13:42:59,926] INFO .apache.spark.SparkContext [] [akka://JobServer/user/context-supervisor/mongoContext] - Starting job: aggregate at MongodbSchema.scala:46

I have used spark-redshift connector worked with no issues.

Please guide and point me what i am doing wrong

Provide some simple Java examples

I assume because you support scala, interfacing with your library via Java should be possible. Would be nice to get some examples off the bat to make this more obvious.

Powered By?

It would be really nice to have a "powered by" section in the docs.

For instance, sparkta will use this package in the next release

Query times out on long running applications

My network is sometimes less reliable than I would wish, which leads to the following exception:

2015-08-27 12:20:48.381|Executor task launch worker-0|ERROR|org.apache.spark.executor.Executor|||||||Exception in task 0.0 in stage 1.0 (TID 1)
com.mongodb.MongoTimeoutException: Timed out after 10000 ms while waiting for a server that matches com.mongodb.ServerAddressSelector@28a60f2f. Client view of cluster state is {type=ReplicaSet, servers=[{address=drtqn1pdmongo01.infosolco.net:27017, type=ReplicaSetPrimary, averageLatency=41.5 ms, state=Connected}, {address=drtqn1pdmongo02.infosolco.net:27017, type=ReplicaSetSecondary, averageLatency=41.2 ms, state=Connected}, {address=drtqn1pdmongo03.infosolco.net:27017, type=ReplicaSetSecondary, averageLatency=41.3 ms, state=Connected}]
at com.mongodb.BaseCluster.getServer(BaseCluster.java:82) ~[mongo-java-driver-2.13.1.jar:na]
at com.mongodb.DBTCPConnector.getServer(DBTCPConnector.java:656) ~[mongo-java-driver-2.13.1.jar:na]

I would like to be able to set the driver timeout from the MongodbConfig to manage this issue.

Class Not Found Exception

I am trying to use the connector in order to get data from mongodb inside a Java Spark Client. I have the Stratio dependecy in my pom, is there anythng else I should add in order to run a basic query?

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: com.stratio.datasource.mongodb. Please find packages at http://spark-packages.org at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:77)

spark-mongodb (0.8.0) connector is not compatible with spark-sql 1.3

spark-mongodb connector is not compatible with spark-sql 1.3.0.
Gives me the following error:
java.lang.IncompatibleClassChangeError: class com.stratio.deep.mongodb.MongodbRelation has interface org.apache.spark.sql.sources.PrunedFilteredScan as super class.

It works with spark-sql 1.2.

Is there a fix available for the same?

is it possible to filter fields other than _id like ObjectId

For a collection like this:

> db.test.findOne()

{
    "_id" : ObjectId("53478e5b0cf223985e8450bd"),
    "account_id" : ObjectId("53479df455b86e5abcee5171"),
}

I can query with the _id like

"select * from test where _id = '56683613d4c6f325c9fe375c'"

but the following query does not seem to work:

"select * from test where account_id = '53479df455b86e5abcee5171' "

In MongoDB,it's natural to query as db.test.find({"account_id": ObjectId("53479df455b86e5abcee5171")})

I have read the source code.

  private def checkObjectID(attribute: String, value: Any)(implicit config: Config) : Any
     = attribute  match {
       case "_id" if idAsObjectId => new ObjectId(value.toString)
       case _ => value
  }

so what if the field goes into the wildcard pattern but it happens to be an ObjectId type, how I am gonna match that field?

So, do i miss something or you just do not support that kind of match at the moment? I am using 0.11.1。

user & pwd

Hi,

when I connect to mongodb from terminal I pass User, Password and AuthenticationDatabase

how can i pass these arguments through this connector?

thanks

Error: not found: value Host

Hi,

I tried reading data from mongo DB using the code from
https://github.com/Stratio/spark-mongodb/blob/master/doc/src/site/sphinx/First_Steps.rst#scala-api

import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.stratio.datasource._
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.schema._
import com.stratio.datasource.mongodb.writer._
import com.stratio.datasource.mongodb.config._
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.util.Config._

val builder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "highschool", Collection ->"students", SamplingRatio -> 1.0, WriteConcern -> "normal"))
val readConfig = builder.build()
val mongoRDD = sqlContext.fromMongoDB(readConfig)
mongoRDD.registerTempTable("students")
sqlContext.sql("SELECT name, age FROM students")

It is failing on this line
val builder = MongodbConfigBuilder(Map(Host -> List("xx.xx.xx.xx:27017"), Database -> "xxx", Collection ->"xxx", SamplingRatio -> 1.0, WriteConcern -> "normal"))

with this error

:39: error: not found: value Host

Any idea how to get around this issue? I built the .JAR using the latest code.
Spark version I use is 1.5.1 and scala 2.10.4 .

git clone https://github.com/Stratio/spark-mongodb.git
mvn clean install

Thanks

Integration with Spark 1.4

Now, I can create a temporal table and insert values with the next code:

val saveConfig = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> EsIndex, Collection ->
     EsMapping, SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Unacknowledged,     SplitSize -> 8, SplitKey -> "_id",
      SplitSize -> 8, SplitKey -> "_id"))

dfMongo.saveToMongodb(saveConfig.build)

val readConfig = saveConfig.build()
val mongoRDD = hContext.fromMongoDB(readConfig)
mongoRDD.registerTempTable("testMongo")

The new write mode for dataSources generate errors:

dfMongo.write.format("com.stratio.provider.mongodb").mode(SaveMode.Append).options(
      Map("host" -> "localhost:27017", "database" -> "testmongo", "collection" -> "realtime")).save()

Exception in thread "main" java.lang.RuntimeException: com.stratio.provider.mongodb.DefaultSource does not allow create table as select.
    at scala.sys.package$.error(package.scala:27)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:335)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
    at com.utad.url.HttpStreamProcessing$.main(HttpStreamProcessing.scala:120)
    at com.utad.url.HttpStreamProcessing.main(HttpStreamProcessing.scala)

The new functions for createExternalTable generate one error in the table creation:

    hContext.createExternalTable("tmongo", "com.stratio.provider.mongodb", schemaMongo, Map("host" ->
      "localhost:27017",
      "database" -> EsIndex, "collection" -> EsMapping))

Exception in thread "main" org.spark-project.guava.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.AnalysisException: com.stratio.provider.mongodb.DefaultSource does not allow user-specified schemas.;
    at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4882)
    at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4898)
    at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:231)

The sql function for create tables in Hive Context fails when execute any query:

    hContext.sql(
      s"""CREATE TABLE IF NOT EXISTS testMongo(
         |  id STRING)
         |USING com.stratio.provider.mongodb
         |OPTIONS (
         |  host 'localhost:27017', database '${EsIndex}', collection '${EsMapping}'
         | )
      """.stripMargin)

    val queryMongo = hContext.sql(s"SELECT id FROM testMongo")
    queryMongo.collect().foreach(r => print(r.toString()))
    println("Mongo Count: " + queryMongo.count())

Exception in thread "main" org.spark-project.guava.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.AnalysisException: com.stratio.provider.mongodb.DefaultSource does not allow user-specified schemas.;
    at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4882)
    at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4898)
    at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:231)

Broken schema discovery for heterogeneous arrays

It seems that schema discovery for documents with heterogeneous arrays just picks the type of the first element or, perhaps, maps everything to string. For example, a field value such as ["props", {a: 1, b: 2}] is mapped to array<string>.

Please Guide how to store a dataframe to MongDB from Python( Pyspark)

Hello

Many thanks for the spark-MongoDB module

Currently i am able to do

sqlCtx.sql("CREATE TEMPORARY TABLE mongoTable(Timestamp STRING, Volume INTEGER) USING com.stratio.datasource.mongodb OPTIONS (host '127.0.0.1:27017', database 'marketdata', collection 'minbars')")
df = sqlCtx.sql("SELECT * FROM mongoTable ")
df.show()

Could you please guide me How to save a dataframe(returned from Spark sqlContext) from python to MongoDB

with regards
Raghav

Relation Equality

Spark does some optimizations based on whether two relations are the same. In the Spark-Avro reference implementation, we can see the equals method implemented.
https://github.com/databricks/spark-avro/blob/01ae707bf3dbe3e74cfc8b04e892eb34defd28fe/src/main/scala/com/databricks/spark/avro/AvroRelation.scala#L189

I'm thinking this should be implemented as well for MongodbRelation to reduce the number of trips to Mongo. I can implement this, what I'm wondering is: under what conditions do we consider two Mongo Relations the same? Do we want to go to Mongo each time if 2 dataframes are created with the same specs?

My instinct for equality would be same config and same schema.

not supported for mongoDB 3.0 and not supported fo spark 1.5

casbah.version is 2.8 and this version does not support mongoDB 3.0.
when I change the casbah version to 3.0
it says "java.lang.NoClassDefFoundError: org/apache/spark/sql/types/UTF8String"
and I found that there is no class like UTF8String in spark sql 1.5
but MongodbReader use it at line 146
by DataFrameAPIExample

Mongo Date type

When a timestamp is written to Mongo, it's stored as its Date type.
Unfortunately, when reading that back in a dataframe, it's put into a string type.

The expected result would be that it's converted into a java.sql.Timestamp. The cost is an expensive UDF to convert it to a proper timestamp.

Here is a demonstration of it failing (I have a local fix in the works, just wanted someone to validate it as an issue or expected behaviour)

it should "properly read java.util.Date (mongodb Date) type as Timestamp" in {
    val dfunc = (s: String) => new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy", Locale.ENGLISH).parse(s)
    import com.mongodb.casbah.Imports.DBObject
    val stringAndDate = List(DBObject("string" -> "this is a simple string.", "date" -> dfunc("Mon Aug 10 07:52:49 EDT 2015")))

    withEmbedMongoFixture(stringAndDate) { mongodbProc =>
      val back = TestSQLContext.fromMongoDB(testConfig)
      back.printSchema()
      assert(back.schema.fields.filter(_.name == "date").head.dataType == TimestampType)
      val timestamp = back.first().get(2).asInstanceOf[Timestamp]
      val origTimestamp = new Timestamp(stringAndDate.head.get("date").asInstanceOf[java.util.Date].getTime)
      timestamp should equal(origTimestamp)
    }
  }

Use the same ObjectId as '_id' of two different collections. What is the purpose of the "idasobjectid" parameter?

Hi,

I would like to use the same ObjectId as '_id' of two different
collections. Is it possible? I have tried using the idasobjectid parameter
but it does not work. What is the purpose of this parameter?

Let see my example. I have two different collections in my Mongodb:

> db.createCollection("prueba1")
{ "ok" : 1 }
> db.createCollection("prueba2")
{ "ok" : 1 }

Let us suppose that we have the following dataframe (I am using pyspark):

sentenceData = sqlContext.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
    ], ["label", "sentence"])

If I store it at the "prueba1" collection:
sentenceData.write.format("com.stratio.datasource.mongodb").mode('append').options(host="localhost:27017", database="db1", collection="prueba1", idField='_id', splitKey='_id', splitSize='10', idasobjectid="true").save()

In MongoDB the data is stored correctly:

> db.prueba1.findOne();
{
    "_id" : ObjectId("56b32f8a41b6b0acfcd070ea"),
    "label" : NumberLong(0),
    "sentence" : "Hi I heard about Spark"
}
> db.prueba1.count();
3

Now I read it to another dataframe:

sentences = sqlContext.read.format("com.stratio.datasource.mongodb").options(host="localhost:27017", database="db1", collection="prueba1", idField='_id', splitKey='_id', splitSize='10', idasobjectid="true").load()
sentences.count()
3
sentences.first()
Row(label=0, _id=u'56b32f8a41b6b0acfcd070ea', sentence=u'Hi I heard about Spark')

And now I store it at the prueba2 collection:
sentenceData.write.format("com.stratio.datasource.mongodb").mode('append').options(host="localhost:27017", database="db1", collection="prueba2", idField='_id', splitKey='_id', splitSize='10', idasobjectid="true").save()

The results in the Mongodb are:

> db.prueba2.findOne();
{
    "_id" : "56b32f8a41b6b0acfcd070ea",
    "label" : NumberLong(0),
    "sentence" : "Hi I heard about Spark"
}
> db.prueba2.count();
3

The "_id" is the same but in string format. What can I do to store it as an ObjectId?

How config to run in mongo replicaset?

For example.
I have a replicaset name 'giaosudau_db' and three mongo server host1:27017, host2:27017, host3:27017
How to config in code?
MongodbConfigBuilder(Map(Host -> List("host1:27017","host2:27017", "host3:27017"), Database -> rawDB, Collection -> pageViewColl))
Because If I only put host1:27017 for connection
MongodbConfigBuilder(Map(Host -> List("host1:27017"), Database -> rawDB, Collection -> pageViewColl))
It throws me a GCOveread error.

Performance testing

Do you guys use anything specifically to test the performance of the library?

I'm thinking something along the lines of:

  1. Take a large json file (maybe 100 MB)
  2. Write it to the embedded test mongo
  3. Do some operations involving the data (self-join, queries, counts, etc). Enough things to force at least 1 read of the full collection.
  4. Output back to a json file (perhaps a copy of the original json, easy to validate then).

A real world application with a very deterministic flow would work as well.
Scala is notorious for having fairly innocuous things that are detrimental to performance. With a performance test of sorts, some of these could be identified in the code and fixed.

Just an idea if something doesn't already exist.

Support for splitKeyMin and splitKeyMax

Right now when a dataframe is created, the splits are generated using min/max. The splitVector command allows you to customize min/max so that you get a more narrow number of splits.
This in turn would lead to a lot few partitions to create/read (which are often empty now with time series data).

java.lang.NoSuchMethodError:findTightestCommonTyp

I got the following exceptions when I ran the code in the "First Steps" section.
5/10/17 03:42:43 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$.findTightestCommonType()Lscala/Function2;
at com.stratio.datasource.mongodb.schema.MongodbSchema.com$stratio$datasource$mongodb$schema$MongodbSchema$$compatibleType(MongodbSchema.scala:85)
at com.stratio.datasource.mongodb.schema.MongodbSchema$$anonfun$4.apply(MongodbSchema.scala:46)

incremental data

Hi, do you know a way to load INCREMENTAL data from MongoDb ?
Here the problem:
I have a document in mongoDb that grow every minute and I need to get new data
within spark to deal with the fresh data.
So, I'm looking for something like this:

Step 1: set connector for mongodb + option to reload data every 60 seconds (or in real time ?)
Step 2: signal (80 [sec](or when refresh is done), run my_new_stat())
Where my_new_stat() is a function where I use spark with the refresh data merged/added with the oldest.

any idea ?
Thanks

Is there a Java Example for creating a DataFrame using MongodbConfigBuilder.

Hi all,

I was looking for an example in java, for creating a DF from MongodbConfigBuilder. Found on scala example, in document it describe scala exmaple.

val mongoRDD = sqlContext.fromMongoDB(readConfig)

sqlContext.fromMongoDB is not there in Java and it is giving a compile time error. Can anyone point to document or example.

Thanks in advance.

README.md --jars include casbah-core

FYI in order to get the Scala API example in the README.md working I needed to include casbah-core in the --jars list in addition to casbah-common & casbah-query which are currently listed in the example.

Connection not getting closed

I used spark-mongodb_2.10 version 0.10.0 for spark 1.5.0 & Mongo 3.2. But today we tried upgrading spark to 1.6.1 latest version so we have changed spark-mongodb_2.10 to version 0.11.0.

But we see connection opened are not getting closed and MongodbConfigBuilder instance is not getting disconnected so application keeps running till we try to stop it manually.

Any one faced similar issue ? Let me know if any configuration changes need to be done.

org.apache.spark spark-core_2.10 1.6.1 compile
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.1</version>
        <scope>compile</scope>
</dependency>
    <dependency>
        <groupId>com.stratio.datasource</groupId>
        <artifactId>spark-mongodb_2.10</artifactId>
        <version>0.11.0</version>
    </dependency>

</dependencies>

val mcInputBuilder = MongodbConfigBuilder(Map(MongodbConfig.Host -> List(MongoHost + ":" + MongoPort), MongodbConfig.Database -> databaseName, MongodbConfig.Collection -> tableName,
  MongodbConfig.SamplingRatio -> 1.0, MongodbConfig.WriteConcern -> MongodbWriteConcern.Normal, MongodbConfig.SplitSize -> 10000))
val mongoRDD = sqlContext.fromMongoDB(mcInputBuilder.build())
mongoRDD.registerTempTable(tableName)

can't serialize class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

when I call in pySpark:

df = sql_context.jsonRDD(json_rdd)
    df.write.format("com.stratio.datasource.mongodb") \
        .mode("append").options(**options).save()

I get

File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 330, in save
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in call
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o70.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5.0 (TID 12, ip-10-235-22-147.us-west-1.compute.internal): java.lang.IllegalArgumentException: can't serialize class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
at org.bson.BasicBSONEncoder._putObjectField(BasicBSONEncoder.java:299)
at org.bson.BasicBSONEncoder.putIterable(BasicBSONEncoder.java:324)
at org.bson.BasicBSONEncoder._putObjectField(BasicBSONEncoder.java:263)
at org.bson.BasicBSONEncoder.putObject(BasicBSONEncoder.java:194)
at org.bson.BasicBSONEncoder._putObjectField(BasicBSONEncoder.java:255)
at org.bson.BasicBSONEncoder.putObject(BasicBSONEncoder.java:194)
at org.bson.BasicBSONEncoder.putObject(BasicBSONEncoder.java:136)
at com.mongodb.DefaultDBEncoder.writeObject(DefaultDBEncoder.java:36)
at com.mongodb.BSONBinaryWriter.encodeDocument(BSONBinaryWriter.java:339)
at com.mongodb.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:45)
at com.mongodb.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:23)
at com.mongodb.BaseWriteCommandMessage.encodeMessageBody(BaseWriteCommandMessage.java:69)
at com.mongodb.BaseWriteCommandMessage.encodeMessageBody(BaseWriteCommandMessage.java:23)
at com.mongodb.RequestMessage.encode(RequestMessage.java:66)
at com.mongodb.BaseWriteCommandMessage.encode(BaseWriteCommandMessage.java:53)
at com.mongodb.DBCollectionImpl.sendWriteCommandMessage(DBCollectionImpl.java:520)
at com.mongodb.DBCollectionImpl.access$200(DBCollectionImpl.java:48)
at com.mongodb.DBCollectionImpl$2.execute(DBCollectionImpl.java:470)
at com.mongodb.DBCollectionImpl$2.execute(DBCollectionImpl.java:461)
at com.mongodb.DBPort.doOperation(DBPort.java:187)
at com.mongodb.DBTCPConnector.doOperation(DBTCPConnector.java:208)
at com.mongodb.DBCollectionImpl.writeWithCommandProtocol(DBCollectionImpl.java:461)
at com.mongodb.DBCollectionImpl.insertWithCommandProtocol(DBCollectionImpl.java:426)
at com.mongodb.DBCollectionImpl.access$2000(DBCollectionImpl.java:48)
at com.mongodb.DBCollectionImpl$Run$4.executeWriteCommandProtocol(DBCollectionImpl.java:849)
at com.mongodb.DBCollectionImpl$Run$RunExecutor.execute(DBCollectionImpl.java:888)
at com.mongodb.DBCollectionImpl$Run.executeInserts(DBCollectionImpl.java:862)
at com.mongodb.DBCollectionImpl$Run.execute(DBCollectionImpl.java:738)
at com.mongodb.DBCollectionImpl.executeBulkWriteOperation(DBCollectionImpl.java:149)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1737)
at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:109)
at com.mongodb.casbah.BulkWriteOperation$$anonfun$2.apply(BulkWriteOperation.scala:88)
at com.mongodb.casbah.BulkWriteOperation$$anonfun$2.apply(BulkWriteOperation.scala:88)
at scala.util.Try$.apply(Try.scala:161)
at com.mongodb.casbah.BulkWriteOperation.execute(BulkWriteOperation.scala:88)
at com.stratio.datasource.mongodb.writer.MongodbBatchWriter$$anonfun$save$1.apply(MongodbBatchWriter.scala:44)
at com.stratio.datasource.mongodb.writer.MongodbBatchWriter$$anonfun$save$1.apply(MongodbBatchWriter.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.stratio.datasource.mongodb.writer.MongodbBatchWriter.save(MongodbBatchWriter.scala:37)
at com.stratio.datasource.mongodb.writer.MongodbWriter.saveWithPk(MongodbWriter.scala:82)
at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:60)
at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:56)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Spark version 1.5.1
runned from spark-submit --packages com.stratio.datasource:spark-mongodb_2.10:0.10.1 test.py

please support read preference option of mongodb

Spark-mongodb just works with spark-sql shell with minimum efforts while it seems that mongodb hadoop connector is not spark-sql friendly. Very impressed with your hard work.

meanwhile, I could not find any connection option on read preference. Can we have one?
or it would be awesome if I can specify connection options using MongoClientURI Format.

How to save Date type field with 3.x driver

when I save java.util.Date to mongoDB, it says spark sqlSchema for type java.util.Date is not supported when I change to java.sql.Date it says org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class java.sql.Date..

I have tried to ask on stackoverflow but nobody answered, So I have to ask for help here
here's my code

case class AppDailyStatistic(date: Date, newUser: Long, launchCount: Long)
val dailyData = new AppDailyStatistic(new Date(DateUtils.getStartOfToday.getTime), newUserCount, launchCount)
val dataFrame: DataFrame = sqlContext.createDataFrame(sc.parallelize(ListAppDailyStatistic));
val saveConfig = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "test",
Collection -> "app_daily_statistic", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal,
SplitSize -> 8, SplitKey -> "_id"))
dataFrame.saveToMongodb(saveConfig.build)

Error when using packages option - Unresolved Dependency

I am trying to submit my task using --packages options:

bin\spark-submit --packages com.stratio.datasource:spark-mongodb:0.8.0 --class classname job.jar

Should I use another name for the package or am I doing something else wrong?

I get the following message related to not solving the dependency:

: problems summary ::
:::: WARNINGS

    ==== local-m2-cache: tried

[...]

    ==== central: tried

      https://repo1.maven.org/maven2/com/stratio/datasource/spark-mongodb/0.8.0/spark-mongodb-0.8.0.pom

      -- artifact com.stratio.datasource#spark-mongodb;0.8.0!spark-mongodb.jar:

      https://repo1.maven.org/maven2/com/stratio/datasource/spark-mongodb/0.8.0/spark-mongodb-0.8.0.jar

    ==== spark-packages: tried

      http://dl.bintray.com/spark-packages/maven/com/stratio/datasource/spark-mongodb/0.8.0/spark-mongodb-0.8.0.pom

      -- artifact com.stratio.datasource#spark-mongodb;0.8.0!spark-mongodb.jar:

      http://dl.bintray.com/spark-packages/maven/com/stratio/datasource/spark-mongodb/0.8.0/spark-mongodb-0.8.0.jar

ssl options somewhat confusing

i find the way ssl is handled somewhat confusing. it seems turning ssl on/off and setting the trust/keystore are conflated.

i would like to have a switch to turn on/off ssl for the mongo client without having to provide the keystore/truststore details. we set these system properties somewhere else in a more central manner.

if it wasnt for backwards compatibility i would suggest to remove keystore and truststore system properties entirely from the scope of this library. that seems a cleaner design to me.

best, koert

credentials conf not working

Credentials -> "anand-singh,spark-mongo-1,anand-singh"
[error] - com.rklick.engine.contexts.MongoContext$ - java.lang.String cannot be cast to scala.collection.immutable.List
java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.immutable.List
    at com.stratio.datasource.mongodb.partitioner.MongodbPartitioner.<init>(MongodbPartitioner.scala:41) ~[spark-mongodb_2.11-0.10.1.jar:na]
    at com.stratio.datasource.mongodb.MongodbRelation.<init>(MongodbRelation.scala:48) ~[spark-mongodb_2.11-0.10.1.jar:na]
    at com.stratio.datasource.mongodb.MongodbContext.fromMongoDB(mongodbFunctions.scala:40) ~[spark-mongodb_2.11-0.10.1.jar:na]
    at com.rklick.engine.contexts.MongoContext$class.toDataFrame(MongoContext.scala:56) ~[classes/:na]
    at com.rklick.engine.contexts.MongoContext$.toDataFrame(MongoContext.scala:69) [classes/:na]
    at com.rklick.engine.contexts.MongoTestContext$.main(MongoContext.scala:78) [classes/:na]
    at com.rklick.engine.contexts.MongoTestContext.main(MongoContext.scala) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_66]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_66]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_66]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_66]
    at sbt.Run.invokeMain(Run.scala:67) [run-0.13.9.jar:0.13.9]
    at sbt.Run.run0(Run.scala:61) [run-0.13.9.jar:0.13.9]
    at sbt.Run.sbt$Run$$execute$1(Run.scala:51) [run-0.13.9.jar:0.13.9]
    at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) [run-0.13.9.jar:0.13.9]
    at sbt.Run$$anonfun$run$1.apply(Run.scala:55) [run-0.13.9.jar:0.13.9]
    at sbt.Run$$anonfun$run$1.apply(Run.scala:55) [run-0.13.9.jar:0.13.9]
    at sbt.Logger$$anon$4.apply(Logger.scala:85) [logging-0.13.9.jar:0.13.9]
    at sbt.TrapExit$App.run(TrapExit.scala:248) [run-0.13.9.jar:0.13.9]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
com.rklick.utils.exception.DataFrameLoadException: java.lang.String cannot be cast to scala.collection.immutable.List

Cannot supply samplingRatio when creating a DataFrame

Thanks very much for the spark-mongodb connector, much appreciated.

I'm having an issue when creating a DataFrame from a MongoDB collection.

The elapsed time for creating the DataFrame is 2 -3 seconds, in this scenario:

  • I am connecting to a remote MongoDB cluster
  • I am creating the DataFrame from a collection with 250 small documents (average document size is 71 bytes)
  • I am not supplying a schema, so I am expecting Stratio's MongoDB's DefaultSource to infer the schema
  • I am supplying a sample ratio to (I think!) limit the amount of data which must be read in order to infer the schema

Walking the code I can see that:

  • DataFrameReader.load() creates a org.apache.spark.sql.execution.datasources.ResolvedDataSource, which - in case RelationProvider - creates a CaseInsensitiveMap from the given options and then invokes com.stratio.datasource.mongodb.DefaultSource
  • The DefaultSource creates a new MongodbRelation but provides no schema which (see MongodbRelation:58) results in the use of a lazy schema like so:
MongodbSchema(new MongodbRDD(sqlContext, config, rddPartitioner), config.get[Any](MongodbConfig.SamplingRatio).fold(MongodbConfig.DefaultSamplingRatio)(_.toString.toDouble)).schema()  
  • The key item here is this: 'config.get[Any](MongodbConfig.SamplingRatio).fold(MongodbConfig.DefaultSamplingRatio)' which uses or overrides the caller supplied samplingRatio expecting the sampling ratio to be provided under the key "schema_samplingRatio" but because the ResolvedDataSourcehas already case insensitised the caller supplied properties our sampling ratio is actually under the key "schema_samplingratio" so the provided MongodbSchema always uses the default sample ratio: 1.0

Am I correct in the above diagnosis? If so, what can be done about it? If not, how can I realiably provide my own sampling ratio?

Any help gratefully accepted.

Version details etc:

  • org.scala-lang::scala-library::2.11.6
  • com.stratio.datasource::spark-mongodb_2.11::0.11.1
  • com.stratio::spark-mongodb::0.8.7
  • org.apache.spark::spark-sql_2.11::1.6.1

Here's a test case showing the behaviour in action;

  @Test
  public void canReproduceCallerSuppliedSamplingRatioIssue() {
    SparkConf sparkConf = new SparkConf()
        .setAppName("ProvidingASamplingRatio")
        .setMaster("local[*]");

    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    SQLContext sqlContext = new SQLContext(sc);

    Double samplingRatio = 0.1;
    long start = System.currentTimeMillis();
    DataFrame countriesSlow = sqlContext
        .read()
        .format("com.stratio.datasource.mongodb")
        .options(getOptions("Scratch", "countries", samplingRatio))
        .load();
    // the preceding call takes 2 - 3 *seconds*
    System.out.println(String.format("Loaded countries without a schema in: %sms using caller supplied sampling ratio: %s",
        (System.currentTimeMillis() - start), samplingRatio));
    countriesSlow.show(5);

    StructType countriesSchema = DataTypes.createStructType(Arrays.asList(
        DataTypes.createStructField("code", DataTypes.StringType, true),
        DataTypes.createStructField("region", DataTypes.StringType, true),
        DataTypes.createStructField("name", DataTypes.StringType, true)
    ));

    start = System.currentTimeMillis();
    DataFrame countriesFast = sqlContext
        .read()
        .format("com.stratio.datasource.mongodb")
        .options(getOptions("Scratch", "countries", null))
        .schema(countriesSchema)
        .load();
    // the preceding call takes 2 - 3 *millis*
    System.out.println(String.format("Loaded countries with a schema in: %sms using default sampling ratio",
        (System.currentTimeMillis() - start)));
    countriesFast.show(5);
  }

  private Map getOptions(String databaseName, String collectionName, Double samplingRatio) {
    Map options = new HashMap();
    // see MongodbConfig
    options.put("host", "...");
    options.put("credentials", "...");
    options.put("database", databaseName);
    options.put("collection", collectionName);
    if (samplingRatio != null) {
      options.put("schema_samplingRatio", samplingRatio);
    }
    return options;
  }

performance Issue in reading data from monodb using MongodbConfigBuilder

We are trying to use MongodbConfigBuilder for reading mongodb data to mongoRDD as shown below. But when our table has million of records when loading mutiple tables its delays whole performance. Reading each time takes lot of time.

Can we apply select or filter condition to load only few rows of mongo table or apply filter to load few rows to improve performance.

  val mcInputBuilder = MongodbConfigBuilder(Map(MongodbConfig.Host -> List(MongoHost + ":" + MongoPort), MongodbConfig.Database -> Database, MongodbConfig.Collection -> tableName,
    MongodbConfig.SamplingRatio -> 1.0, MongodbConfig.WriteConcern -> MongodbWriteConcern.Normal, MongodbConfig.SplitSize -> 1000, MongodbConfig.ConnectionsPerHost -> "100"))
  val mongoRDD = sqlContext.fromMongoDB(mcInputBuilder.build())
  mongoRDD.registerTempTable(tableName)

Job stuck with the message.

[2016-01-17 17:22:45,069] INFO .apache.spark.SparkContext [] [akka://JobServer/user/context-supervisor/mongo] - Starting job: aggregate at MongodbSchema.scala:46

Hi,

I am seeing this whenever i run a job. Whatever the size of the collection may be. I am using Ec2 mongo which i am able to login and view data using Robomongo.

I am using java and using the approach provided in the example for java.

DataFrame df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();

options is provided correctly

Map options = new HashMap();
options.put("host", "10.1.1.0:27017");
options.put("database", "xdb");
options.put("collection", "mcollection");
options.put("credentials", "user,db,password");

Can anyone point me to the right direction.

saveToMongoDB doesnt accept deepconfig

def saveToMongodb(config : com.stratio.datasource.util.Config, batch : scala.Boolean = { /* compiled code / }) : scala.Unit = { / compiled code */ }

What it accepts is a Config and not a deepConfig. Is there an override some where ?

How to update data in collection with query, data, upsert, multi to update?

Hi,
I read the code and still don't find the way to update the collection like what I want.
I saw we have updatedFields option but it's only query path.
I want to know about the update_data path -> in $set update only field I want to update other keep it.
In Mongo update command have something I mostly use.
query -> find match records to update
update_data -> what field need to be update in $set: {field: value}
upsert
multi.

Please help provide some example about that.
Thanks.

spark-sql fails to run query with exception 'can't serialize class org.apache.spark.sql.types.UTF8String'

It fails when string predicate is used in WHERE clause.

$ bin/spark-sql --jars lib/spark-mongodb-core-0.8.4.jar,lib/casbah-core_2.10-2.8.0.jar,lib/casbah-commons_2.10-2.8.0.jar,lib/casbah-query_2.10-2.8.0.jar,lib/mongo-java-driver-2.13.0.jar

SET spark.sql.hive.version=0.13.1
spark-sql> create temporary table test using com.stratio.provider.mongodb options (host 'localhost', database 'test_log', collection 'logs');
Time taken: 4.119 seconds
spark-sql> select distinct user, type from test;
GOOGLE_PLAY_STORE       1600
GOOGLE_PLAY_STORE       12000
...
Time taken: 1.589 seconds, Fetched 59 row(s)
spark-sql> select aid, user, type from test where user = 'GOOGLE_PLAY_STORE' limit 2;
15/06/24 20:15:31 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 205)
java.lang.IllegalArgumentException: can't serialize class org.apache.spark.sql.types.UTF8String
        at org.bson.BasicBSONEncoder._putObjectField(BasicBSONEncoder.java:299)
        at org.bson.BasicBSONEncoder.putObject(BasicBSONEncoder.java:194)
        at org.bson.BasicBSONEncoder.putObject(BasicBSONEncoder.java:136)
        at com.mongodb.DefaultDBEncoder.writeObject(DefaultDBEncoder.java:36)
        at com.mongodb.OutMessage.putObject(OutMessage.java:289)
        at com.mongodb.OutMessage.writeQuery(OutMessage.java:211)
        at com.mongodb.OutMessage.query(OutMessage.java:86)
        at com.mongodb.DBCollectionImpl.find(DBCollectionImpl.java:81)
        at com.mongodb.DBCollectionImpl.find(DBCollectionImpl.java:66)
        at com.mongodb.DBCursor._check(DBCursor.java:498)
        at com.mongodb.DBCursor._hasNext(DBCursor.java:621)
        at com.mongodb.DBCursor.hasNext(DBCursor.java:657)
        at com.mongodb.casbah.MongoCursorBase$class.hasNext(MongoCursor.scala:76)
        at com.mongodb.casbah.MongoCursor.hasNext(MongoCursor.scala:517)
        at com.stratio.provider.mongodb.reader.MongodbReader$$anonfun$hasNext$2.apply(MongodbReader.scala:58)
        at com.stratio.provider.mongodb.reader.MongodbReader$$anonfun$hasNext$2.apply(MongodbReader.scala:58)
        at scala.Option.fold(Option.scala:157)
        at com.stratio.provider.mongodb.reader.MongodbReader.hasNext(MongodbReader.scala:58)
        at com.stratio.provider.mongodb.rdd.MongodbRDDIterator.hasNext(MongodbRDDIterator.scala:57)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
...

the query finished successfully when I used number in WHERE clause.

spark-sql> select aid, user, type from test where type = 1600 limit 2;
54dca26084a92c48da000679        GOOGLE_PLAY_STORE       1600
546c2cc17275bca41c000017        NULL    1600
Time taken: 0.287 seconds, Fetched 2 row(s)
spark-sql>

I'm testing with connector version 0.8.4, spark 1.4.0 (prebuilt for hadoop 2.4 and above)

How-to use MongoDB index directly for querying

Hi there,

I have a MongoDB collection that contains time series data. It contains several million entries. There is an index on the "createdAt"-Field. I want to select a subset of the data (e.g. one month) and do some computation/transformation with Spark on it.

I have written the following code to select the data:

val builder = MongodbConfigBuilder(Map(Host -> List("<host>:<port>"), Database -> "<database>", Collection -> "<collection>", Credentials -> List(MongodbCredentials("<username>","<database>","<password>".toCharArray))))
val config = builder.build()

val schema = StructType(Seq(
        StructField("createdAt",TimestampType,true),
        // some more fields
))

val mongoRDD = sqlContext.fromMongoDB(readConfig, Some(schema))
mongoRDD.registerTempTable("mytable")
val resultFromMongo = sqlContext.sql("SELECT * FROM mytable WHERE createdAt >= '2015-12-01' AND createdAt < '2016-01-01'")

The select is quite slow even if I change the code to only select on day. It seems to me (I am quite new to Spark), that the whole collection data is loaded and the filtering is done by Spark in memory. I would like to only select and fetch the data of the given period directly from MongoDB.

Is it possible to achieve this with your connector?

BTW: Your connector already helped me a lot. Thank you very much for your effort!

ClassCast Exception with double nested document arrays

Hi,

I don't know If I am not using the library correctly, but
For a document like:

{
"a": "x",
"b": [
  {
    "c": "y",
     "d": [{
              "e": "z"  
           }, {....}]
   }, {....}
]
}

The code

 val mongoRDD = originalDataFrame.map[DBObject]((row: Row) => {
        val dbObject = MongodbRowConverter.rowAsDBObject(row, row.schema)
        dbObject.put("consultant", "John Macclane")
        dbObject
      })

      val newRDD = MongodbRowConverter.asRow(originalDataFrame.schema, mongoRDD)
      sqlContext.createDataFrame(newRDD, originalDataFrame.schema)

Fails in the line val newRDD = MongodbRowConverter.asRow(originalDataFrame.schema, mongoRDD) with something along the lines of

ClassCastException. Can't cast "::" to BasicDBList

I "fixed it" locally by changing the method toSQL in the class MongodbRowConverter to

  def toSQL(value: Any, dataType: DataType): Any = {
    Option(value).map{value =>
      dataType match {
        case ArrayType(elementType, _) =>
          var basicDBList: BasicDBList = null
          if(value.isInstanceOf[List[_]]) {
            basicDBList = new BasicDBList
            basicDBList.addAll(new java.util.ArrayList[AnyRef](value.asInstanceOf[List[AnyRef]].asJava))
          }else {
            basicDBList = value.asInstanceOf[BasicDBList]
          }
          basicDBList.map(toSQL(_, elementType))
        case struct: StructType =>
          recordAsRow(dbObjectToMap(value.asInstanceOf[DBObject]), struct)
        case _ =>
          //Assure value is mapped to schema constrained type.
          enforceCorrectType(value, dataType)
      }
    }.orNull
  }

Please let me known if this is a known bug, or is just me not using something correctly

Problem with dependencies

Hey guys, I created a simple program with SBT and Scala. So I cloned this project and published it locally.

Since this is a maven project I added this line to my SBT build file

resolvers += Resolver.mavenLocal

Although I already have Casbah as an Ivy dependency solved I'm getting this error.

Error:Error while importing SBT project:
...
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:235)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[error] sbt.ResolveException: unresolved dependency: org.mongodb#casbah-commons_${scala.binary.version};2.8.0: not found
[error] unresolved dependency: org.mongodb#casbah-query_${scala.binary.version};2.8.0: not found
[error] unresolved dependency: org.mongodb#casbah-core_${scala.binary.version};2.8.0: not found
[error] Use 'last' for the full log.

Any idea?

Sbt

Is it possible to use this library with sbt dependency management ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.