Comments (33)
@findchris You're building the spark-dynamodb
project, which is built with Maven.
git clone [email protected]:traviscrawford/spark-dynamodb.git
cd spark-dynamodb
mvn install -DskipTests -Dgpg.skip=True
copy spark-dynamodb-0.0.4-SNAPSHOT.jar
into lib
of your sbt project. You can comment out your spark-dynamodb
dependency in sbt.
from spark-dynamodb.
Thanks for testing this! It's been published to maven central.
On Thu, Sep 29, 2016 at 4:15 PM Chris Johnson [email protected]
wrote:
A follow up:
When I tried to import the actual RateLimiter class (based on the above
path), we get an error:scala> import com.google.common.util.concurrent.RateLimiter
:51: error: object util is not a member of package com.google.common
import com.google.common.util.concurrent.RateLimiterHowever, when I explicitly depend on guava 14.0.1 in my sbt file ("com.google.guava"
% "guava" % "14.0.1"), I finally got it working with rate-limiting!This was using the manual steps provided by @timchan-lumoslabs
https://github.com/timchan-lumoslabs to build
spark-dynamodb-0.0.4-SNAPSHOT.jar.@traviscrawford https://github.com/traviscrawford, not sure why I
explicitly had to specify the guava dependency, but seems good now. Merge?—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#9 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAG2gpjQueR3b8OBZNid4hqLMgCoFv2gks5qvEZ3gaJpZM4JxGV0
.
from spark-dynamodb.
Latest version is 0.0.5
in Maven Central. You can see the release history at:
http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22com.github.traviscrawford%22%20AND%20a%3A%22spark-dynamodb%22
from spark-dynamodb.
Also, since it is related, how does this option
interact with the segments
option
? Thanks!
from spark-dynamodb.
Hey @traviscrawford, are you sure your read_capacity_pct
is being honored correctly on this version of spark-dynamodb
? I can't see how it would work correctly.
from spark-dynamodb.
I did a bit more digging and didn't see anything provided by AWS for explicitly throttling consumption of read units. I did stumble across an article discussing a technique for doing this at the application level, but I don't see anything like this in spark-dynamodb
.
I assume the ScanSpec
's inclusion of withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
is meant to facilitate this work, but I don't see any further usage of capacity.
Anyway, thanks for looking at this with me. This is blocking a production deploy.
from spark-dynamodb.
Only other thought I had: Is this throttling enforced per Spark executor? If not, how is it enforced? If it's per executor, obviously a distributed Spark job of varying size would have different capacity consumption based on the cluster size.
from spark-dynamodb.
Hi @findchris , thanks for reporting this. You're correct - readCapacityPct
is not used at this time. The background here is I'm porting our internal DynamoDB scanner that does have a rate limit to this native data source, and have not ported the rate limiter over. I'll take a look at this now.
Great to hear you find this useful.
from spark-dynamodb.
Hi @findchris - what do you think about the approach in https://github.com/traviscrawford/spark-dynamodb/compare/travis/ratelimit ? Are you able to try this in your environment prior to publishing a new release?
from spark-dynamodb.
Thanks for chiming in here @traviscrawford. The pull request looks solid and straight-forward. 👍
Can you help me understand the semantics of segments
, and how this interacts with the rate_limit_per_segment
, given the distributed environment of Spark?
I'd be happy to test this in QA, but I'm still relatively new to the Scala world. I'm currently using this project like so:
"com.github.traviscrawford" % "spark-dynamodb" % "0.0.2"
Besides a version bump (your call), got an easy way for me to test this out?
Out of curiosity, how do you implement your internal DynamoDB scanner's rate limiting?
from spark-dynamodb.
Take a look at http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/QueryAndScanGuidelines.html for an overview of how to best scan DynamoDB tables.
When tuning scans, there are a number of variables:
- Table provisioned read capacity units
- Number of records
- Size of table
- Scan segments
- Page size
- Scanner rate limit
I generally scan tables as part of an ETL process, and start with a single segment that uses about 20% of the provisioned read capacity units. If that's fast enough I don't look much further. Sometimes the scan is slow because a small table has very low provisioned read capacity units, so I increase that. Sometimes the table is large and adding scan segments greatly speeds up the scan.
Given the above variables that affects table scans, what do your tables look like?
from spark-dynamodb.
Thanks for the link and thoughts @traviscrawford. I haven't had to do much Scan
ning in the past (so I hadn't yet read about TotalSegments
), but like you, I'll be using this for an ETL process. I have a pretty big table (~150GB), with a fair number of attributes (although I'll only be using a handful of them in my ETL).
Like you mention, I'll just have to experiment with different values for the segments
parameter.
I noticed that you just added 0.0.4-SNAPSHOT
. Is that ready for me to test out?
from spark-dynamodb.
I just published version 0.0.3
with the rate limiting change to maven central - it might take a while to propagate, but once it does you can try that out.
from spark-dynamodb.
👍 I'll keep you posted on my testing.
from spark-dynamodb.
@traviscrawford - I just tried this out (sorry for the delay), but it looks like there might be a guava
version conflict or something (based on the stackoverflow threads here and here). The exception:
java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.createStarte
d()Lcom/google/common/base/Stopwatch;
at com.google.common.util.concurrent.RateLimiter$SleepingStopwatch$1.<init>(RateLimiter.java:417)
at com.google.common.util.concurrent.RateLimiter$SleepingStopwatch.createFromSystemTimer(RateLimiter.java:416)
at com.google.common.util.concurrent.RateLimiter.create(RateLimiter.java:130)
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:147)
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:145)
at scala.Option.map(Option.scala:145)
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$.scan(DynamoDBRelation.scala:145)
from spark-dynamodb.
@traviscrawford - I'm still trying to resolve this. It appears that Spark depends on guava
14.0
, and it appears as if this spark-dynamodb
depends on guava
18.0
. The Stopwatch.createStarted
call doesn't existing in 14.0
from what I can tell, and that is the source of the problem.
I'm looking into "shading" the dependency, but don't have much experience there.
from spark-dynamodb.
Still no luck. I've tried:
// in build.sbt
dependencyOverrides += "com.google.guava" % "guava" % "18.0"
// direction taken from: http://arjon.es/2015/10/12/making-hadoop-2-dot-6-plus-spark-cassandra-driver-play-nice-together/
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
)
This StackOverflow answer would seem to point to the solution, but I've had no luck using --conf spark.{driver,executor}.userClassPathFirst=true
.
To reiterate, guava 14.0
, which Spark 1.6.1 depends on, lacks the method Stopwatch.createStarted
. guava 18.0
, however, has this method, which is used by the RateLimiter
.
I think the answer lies with shading, but I have no experience using this technique, and my attempts above failed.
@traviscrawford, can you offer any thoughts?
from spark-dynamodb.
@findchris are you marking your Spark dependencies as "provided"
?
from spark-dynamodb.
Thanks for chiming in @timchan-lumoslabs. Here is what I have in my build.sbt's libraryDependencies
section:
"org.apache.spark" %% "spark-core" % "1.6.1" % "provided" exclude("org.slf4j", "jcl-over-slf4j") exclude("org.scala-lang", "scala-library") exclude("org.scala-lang", "scala-reflect") exclude("com.google.guava", "guava"),
"org.apache.spark" %% "spark-sql" % "1.6.1" % "provided" exclude("org.scala-lang", "scala-library") exclude("org.scala-lang", "scala-reflect") exclude("com.google.guava", "guava"),
"org.apache.spark" %% "spark-hive" % "1.6.1" % "provided" exclude("stax", "stax-api") exclude("org.scala-lang", "scala-library") exclude("org.scala-lang", "scala-reflect") exclude("com.google.guava", "guava"),
"org.apache.spark" %% "spark-mllib" % "1.6.1" % "provided"
See anything obviously for me to change?
from spark-dynamodb.
@traviscrawford / @timchan-lumoslabs - Any insights as to what might be going on? I just need to make sure spark-dynamodb
is making calls to guava 18.0
and not 14.0
, and sadly I don't know how to do it.
from spark-dynamodb.
Hi @findchris - my hunch is we could downgrade to the version of Guava used by Spark and make this issue go away for you. Will take a look...
from spark-dynamodb.
@traviscrawford - That's always an option. I have to imagine there is a solution to this problem, and I believe it involves shading - I'm just not sure how to do it. Maybe you can shade guava
and still use 18.0
(or higher) using maven-shade-plugin
?
Either way, I appreciate the help!
from spark-dynamodb.
@findchris @timchan-lumoslabs Question for you - internally at Medium we have tried a few different approaches to integrating DynamoDB with Spark, and the approach we're planning on using going forward is:
Using a Spark job to backup the DynamoDB table as JSON on s3. We chose JSON over a binary format such as Parquet because DynamoDB tables do not have a schema, so we can avoid the schema issue during the backup phase.
Then we simply read the DynamoDB backups like you normally would read JSON from a Spark job. At this point DynamoDB is not in the picture at all.
Would the above approach work for you? If so, I could see if we can open source the Spark-based DynamoDB scanner. If we're all using the same approach and code in production it would be easier to make sure it handles all the corner cases.
from spark-dynamodb.
@traviscrawford - I appreciate the collaboration and interest in sharing more useful code.
Your suggestion sounds interesting. Let me share my use case and we can see if we have a compatible use case. I need to scan an entire DynamoDB table (optimally I'd use a filter expression to eliminate some records lacking certain attributes), projecting out a subset of the returned attributes, doing some light transformation of the data, and then write out the results to S3 as CSV.
So for my use case, I'm not doing a straight backup to S3. I suppose what you describe would work, with the disadvantage that I'd need two Spark jobs for my case: One to backup the table to S3 as JSON, and another to read the JSON and operate on it.
Does that help to shed light on my usage?
from spark-dynamodb.
@traviscrawford Our use case is somewhat similar to yours. We are basically forklifting the data in a DynamoDB table attribute to Redshift. Item attribute values are JSON.
from spark-dynamodb.
@findchris We have some use-cases like yours too, where we need to filter & project some records in the DynamoDB table before processing. DynamoDB filters are interesting, where you actually scan the full table, and filters are applied before returning rows to the client. Since behind-the-scenes we're scanning the full table, we simply write the whole thing out to s3. Then a separate job processes the backups.
Would y'all find if useful if we published the backup job we're using?
from spark-dynamodb.
Would y'all find if useful if we published the backup job we're using?
Yes. AWS has their Data Pipeline stuff, but it feels more consistent to have it all in Spark. Does this backup job happen to have a throttling mechanism built in @traviscrawford ? ;-)
from spark-dynamodb.
@findchris I just switched to using the guava dependency that Spark provides, so you shouldn't have the issue anymore.
Can you mvn install -DskipTests
on your local machine to build from source, then depend on the snapshot version you installed locally to test this. If it works I'll publish a new release to maven central.
from spark-dynamodb.
@traviscrawford - A bit embarrassed to ask, but I'll need more hand-holding to do what you suggest. I use sbt
, and haven't done any building from source for my dependencies. (I think publishTo
is what I need; just totally unfamiliar).
Regardless, I appreciate the work. Looking forward to testing this all out.
from spark-dynamodb.
@timchan-lumoslabs I appreciate the tips! I did what you said, and it compiled ok as sbt assembly
ran without issue.
However, @traviscrawford, when I run my job I'm seeing a new stacktrace:
java.lang.NoClassDefFoundError: com/google/common/util/concurrent/RateLimiter
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:147)
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$7.apply(DynamoDBRelation.scala:145)
at scala.Option.map(Option.scala:145)
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$.scan(DynamoDBRelation.scala:145)
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$buildScan$3.apply(DynamoDBRelation.scala:97)
at com.github.traviscrawford.spark.dynamodb.DynamoDBRelation$$anonfun$buildScan$3.apply(DynamoDBRelation.scala:97)
...
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.google.common.util.concurrent.RateLimiter
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 25 more
I was concerned RateLimiter
wasn't around in guava 14.0
, but it was introduced in 13.0
.
Any idea what's up? I hope to investigate more tomorrow.
from spark-dynamodb.
A follow up:
When I tried to import the actual RateLimiter
class (based on the above path), we get an error:
scala> import com.google.common.util.concurrent.RateLimiter
<console>:51: error: object util is not a member of package com.google.common
import com.google.common.util.concurrent.RateLimiter
However, when I explicitly depend on guava 14.0.1 in my sbt file ("com.google.guava" % "guava" % "14.0.1"
), I finally got it working with rate-limiting!
This was using the manual steps provided by @timchan-lumoslabs to build spark-dynamodb-0.0.4-SNAPSHOT.jar
.
@traviscrawford, not sure why I explicitly had to specify the guava dependency, but seems good now. Merge?
from spark-dynamodb.
Is the current version then 0.0.6-SNAPSHOT
?
from spark-dynamodb.
Thanks for all the correspondence! Closing this out now.
from spark-dynamodb.
Related Issues (20)
- Does this library support querying global secondary index of a dynamodb table? HOT 2
- Getting Null Values when using SparkSql HOT 1
- Not able to write to s3 HOT 2
- Using spark-dynamodb in unit tests HOT 4
- org.apache.spark.sql.AnalysisException: Table or view not found: HOT 3
- Issues with pyspark 2.2 (throttling and filtering) HOT 1
- Please publish the most recent commit so I can get it from Maven HOT 1
- Need recommendation
- Update Spark Version 2.2 HOT 6
- Does not work with EMR. HOT 2
- Can we update the items in dynamodb using this library HOT 2
- Error mapping a nested format HOT 2
- after importing the library and querying the table, i found the error com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
- The dataframe's columns was misplaced when there were more than 4 columns in the specified schema HOT 3
- Reader parses schema but returns no rows HOT 4
- filter_expression doesn't seem to parse correctly for UUID's or fields with colons HOT 1
- Working dependencies? HOT 1
- Does the library support writing to dynamoDB using the Query with SQL method?
- Can't write DataFrame result back to DynamoDB Table.
- Read from DynamoDB JSON with Spark in Scala
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spark-dynamodb.