Git Product home page Git Product logo

spark-alchemy's Introduction

spark-alchemy

Spark Alchemy is a collection of open-source Spark tools & frameworks that have made the data engineering and data science teams at Swoop highly productive in our demanding petabyte-scale environment with rich data (thousands of columns).

Supported languages

While spark-alchemy, like Spark itself, is written in Scala, much of its functionality, such as interoperable HyperLogLog functions, can be used from other Spark-supported languages such as SparkSQL and Python.

Installation

Add the following to your libraryDependencies in SBT:

libraryDependencies += "com.swoop" %% "spark-alchemy" % "1.0.1"

You can find all released versions here.

Some use cases such as interoperability with PySpark may require the assembly of a fat JAR of spark-alchemy. To assemble, run sbt assembly. To skip tests during assembly, run sbt 'set sbt.Keys.test in assembly := {}' assembly instead.

For Spark users

  • Native HyperLogLog functions that offer reaggregatable fast approximate distinct counting capabilities far beyond those in OSS Spark with interoperability to Postgres and even JavaScript. Just as Spark's own native functions, once the functions are registered with Spark, they can be used from SparkSQL, Python, etc.

For Spark framework developers

For Python developers

  • See HyperLogLog functions for an example of how spark-alchemy HLL functions can be registered for use through PySpark.

What we hope to open source in the future, if we have the bandwidth

  • Configuration Addressable Production (CAP), Automatic Lifecycle Management (ALM) and Just-in-time Dependency Resolution (JDR) as outlined in our Spark+AI Summit talk Unafraid of Change: Optimizing ETL, ML, and AI in Fast-Paced Environments.

  • Utilities that make Delta Lake development substantially more productive.

  • Hundreds of productivity-enhancing extensions to the core user-level data types: Column, Dataset, SparkSession, etc.

  • Data discovery and cleansing tools we use to ingest and clean up large amounts of dirty data from third parties.

  • Cross-cluster named lock manager, which simplifies data production by removing the need for workflow servers much of the time.

  • case class code generation from Spark schema, with easy implementation customization.

  • Tools for deploying Spark ML pipelines to production.

Development

Build docs microsite

sbt "project docs" makeMicrosite

Run docs microsite locally (run under docs/target/site folder)

jekyll serve -b /spark-alchemy

More details

More from Swoop

  • spark-records: bulletproof Spark jobs with fast root cause analysis in the case of failures

Community & contributing

Contributions and feedback of any kind are welcome. Please, create an issue and/or pull request.

Spark Alchemy is maintained by the team at Swoop. If you'd like to contribute to our open-source efforts, by joining our team or from your company, let us know at spark-interest at swoop dot com.

License

spark-alchemy is Copyright © 2018-2020 Swoop, Inc. It is free software, and may be redistributed under the terms of the LICENSE.

spark-alchemy's People

Contributors

adityachaganti1 avatar greglu avatar j03wang avatar mrpowers avatar pidge avatar ssimeonov avatar sunpj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-alchemy's Issues

Error while using HLL Functions in Spark: net/agkn/hll/serialization/IHLLMetadata

Hello Developer Community

We are using EMR 6.1 that comes by default with spark version 3.0. We then installed spark-alchemy 2.12 in it using the below command:
wget -O /home/hadoop/spark-alchemy_2.12-1.0.1.jar https://repo1.maven.org/maven2/com/swoop/spark-alchemy_2.12/1.0.1/spark-alchemy_2.12-1.0.1.jar

After doing this we registered HLL functions in Zeppelin notebook without error using the below command:
%spark com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark)

However when we ran the below query to process our data column containing HLL sketches, we received an error :
%sql select hll_cardinality(hll_merge(exposure_hll)) from table1

ERROR:
Error happens in sql: select hll_cardinality(hll_merge(exposure_hll)) from table1
net/agkn/hll/serialization/IHLLMetadata; line 1 pos 23
set zeppelin.spark.sql.stacktrace = true to see full stacktrace

I am not sure what's going wrong now, would be really grateful if you could provide some guidance on this.

Thanks !

Python or Java support?

Hello,

I was wondering if the HyperLogLog functions can be used in Java or Python (through Pyspark). I would assume so for Java. If there is support, I was wondering if examples can be posted on the Wiki.

Thank you!

Registering functions for use with SparkSQL in pyspark

I'm interested in using your HLL implementation in a pyspark project but i'm having trouble figuring out how to properly register the functions for use in sql execution. I'm unsure of how i'd execute com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark) in scala from pyspark (from the documentation under
"Using HLL functions"."From SparkSQL". I've tried the following without any luck.

sql_context = SQLContext(spark_context)

sql_context.registerJavaFunction('hll_init', 'com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration')
sql_context.registerJavaFunction('hll_init', 'com.swoop.alchemy.spark.expressions.hll.functions.hll_init')
sql_context.registerJavaFunction('hll_init', 'com.swoop.alchemy.spark.expressions.hll.functions.HyperLogLogInitSimple')

spark_session.sql("create temporary function hll_init as 'com.swoop.alchemy.spark.expressions.hll'")
spark_session.sql("create temporary function hll_init as 'com.swoop.alchemy.spark.expressions.hll.functions.HyperLogLogInitSimple'")

Error while registering HLL Functions: "object swoop is not a member of package com"

Hello Developers !
I need to use HLL Functions in my SQL codes. I am using Zeppelin/Jupyter Notebooks and running my sql codes in a PySpark setting. To register HLL Functions in spark environment, I had been using the below command:

com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark)

Everything was working good until April 2021 but starting May, this command now gives the following error:

:24: error: object swoop is not a member of package com
com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark)

Please help me with how to register HLL functions if this command is giving this error. This is the standard command to register HLL functions in spark and that's what I see on the web. No idea why it suddenly started giving an error now.

spark error

Error using HLL Functions in Spark

Hello Devs

We are using EMR 6.1 with spark version 3.0 and spark-alchemy 2.12-1.0.1.jar.
HLL functions were successfully registered in Zeppelin notebook with the below command:
%spark com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark)

When trying to process our data column (exposure_hll) containing HLL sketches, we received an error :
%sql
select hll_cardinality(hll_merge(exposure_hll)) from table1

ERROR:
Error happens in sql: select hll_cardinality(hll_merge(exposure_hll)) from table1
net/agkn/hll/serialization/IHLLMetadata; line 1 pos 23

Attaching the full error log and below are few lines from the full error log:

Caused by: org.apache.spark.sql.AnalysisException: net/agkn/hll/serialization/IHLLMetadata; line 1 pos 23
at org.apache.spark.sql.EncapsulationViolator$.createAnalysisException(EncapsulationViolator.scala:11)
at com.swoop.alchemy.spark.expressions.NativeFunctionRegistration.$anonfun$expression$4(NativeFunctionRegistration.scala:64)
at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:121)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1439)
at org.apache.spark.sql.hive.HiveSessionCatalog.super$lookupFunction(HiveSessionCatalog.scala:135)
at org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$lookupFunction0$2(HiveSessionCatalog.scala:135)

spark HLL Function error log .txt

Any help would be HUGELY appreciated ! Thanks !!

Installing spark-alchemy on spark 3.0 breaks reading dataframes in

I recently tried installing spark-alchemy using spark 3.0 using the following:

spark-shell --repositories https://dl.bintray.com/swoop-inc/maven/ --packages com.swoop:spark-alchemy_2.12:1.0.0

However, when in the shell, I can't read in any files. The following code returns:

val df = spark.read.parquet(“path/to/file")

Note: When I do a regular spark-shell I can read in the data fine

20/09/21 13:16:17 ERROR Utils: Aborting task                        (0 + 1) / 1]

java.io.IOException: Failed to connect to m298188dljg5j.symc.symantec.com/172.19.129.194:49801

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)

       at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:392)

       at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:360)

       at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

       at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)

       at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:359)

       at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719)

       at org.apache.spark.util.Utils$.fetchFile(Utils.scala:535)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:869)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:860)

       at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

       at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

       at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)

       at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)

       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)

       at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)

       at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

       at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:860)

       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:404)

       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

       at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: m298188dljg5j.symc.symantec.com/172.19.129.194:49801

Caused by: java.net.ConnectException: Operation timed out

       at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

       at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)

       at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)

       at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)

       at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

       at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

       at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

       at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

       at java.base/java.lang.Thread.run(Thread.java:834)

20/09/21 13:16:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.io.IOException: Failed to connect to m298188dljg5j.symc.symantec.com/172.19.129.194:49801

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)

       at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:392)

       at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:360)

       at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

       at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)

       at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:359)

       at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719)

       at org.apache.spark.util.Utils$.fetchFile(Utils.scala:535)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:869)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:860)

       at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

       at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

       at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)

       at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)

       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)

       at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)

       at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

       at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:860)

       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:404)

       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

       at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: m298188dljg5j.symc.symantec.com/172.19.129.194:49801

Caused by: java.net.ConnectException: Operation timed out

       at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

       at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)

       at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)

       at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)

       at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

       at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

       at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

       at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

       at java.base/java.lang.Thread.run(Thread.java:834)

20/09/21 13:16:17 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, m298188dljg5j.symc.symantec.com, executor driver): java.io.IOException: Failed to connect to m298188dljg5j.symc.symantec.com/172.19.129.194:49801

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)

       at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:392)

       at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:360)

       at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

       at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)

       at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:359)

       at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719)

       at org.apache.spark.util.Utils$.fetchFile(Utils.scala:535)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:869)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:860)

       at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

       at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

       at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)

       at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)

       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)

       at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)

       at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

       at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:860)

       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:404)

       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

       at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: m298188dljg5j.symc.symantec.com/172.19.129.194:49801

Caused by: java.net.ConnectException: Operation timed out

       at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

       at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)

       at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)

       at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)

       at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

       at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

       at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

       at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

       at java.base/java.lang.Thread.run(Thread.java:834)

 

20/09/21 13:16:17 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, m298188dljg5j.symc.symantec.com, executor driver): java.io.IOException: Failed to connect to m298188dljg5j.symc.symantec.com/172.19.129.194:49801

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)

       at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)

       at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:392)

       at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:360)

       at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

       at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)

       at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:359)

       at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719)

       at org.apache.spark.util.Utils$.fetchFile(Utils.scala:535)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:869)

       at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:860)

       at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

       at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

       at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)

       at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)

       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)

       at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)

       at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

       at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:860)

       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:404)

       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

       at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: m298188dljg5j.symc.symantec.com/172.19.129.194:49801

Caused by: java.net.ConnectException: Operation timed out

       at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

       at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)

       at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)

       at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)

       at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

       at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

       at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

       at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

       at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

       at java.base/java.lang.Thread.run(Thread.java:834)

 

Driver stacktrace:

  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)

  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)

  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)

  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)

  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)

  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)

  at scala.Option.foreach(Option.scala:407)

  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)

  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)

  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)

  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)

  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)

  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)

  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)

  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)

  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)

  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)

  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)

  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)

  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)

  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)

  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)

  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)

  at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)

  at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)

  at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:114)

  at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:67)

  at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)

  at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:193)

  at scala.Option.orElse(Option.scala:447)

  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:190)

  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:401)

  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)

  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)

  at scala.Option.getOrElse(Option.scala:189)

  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:535)

  ... 47 elided

Caused by: java.io.IOException: Failed to connect to m298188dljg5j.symc.symantec.com/172.19.129.194:49801

  at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)

  at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)

  at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:392)

  at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:360)

  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)

  at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:359)

  at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719)

  at org.apache.spark.util.Utils$.fetchFile(Utils.scala:535)

  at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:869)

  at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:860)

  at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

  at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

  at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)

  at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)

  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)

  at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)

  at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

  at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:860)

  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:404)

  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

  at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: m298188dljg5j.symc.symantec.com/172.19.129.194:49801

Caused by: java.net.ConnectException: Operation timed out

  at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

  at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)

  at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)

  at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)

  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)

  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)

  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)

  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)

  at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)

  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)

  at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

  at java.base/java.lang.Thread.run(Thread.java:834)

Outdated documentation

Hi,

I'm just looking for some clarification regarding Postgres interoperability:

Native HyperLogLog functions that offer reaggregatable fast approximate distinct counting capabilities far beyond those in OSS Spark with interoperability to Postgres and even JavaScript.
https://github.com/swoop-inc/spark-alchemy#for-spark-users

They enable interoperability at the HLL sketch level with other data processing systems. We use an open-source HLL library with an independent storage specification and built-in support for Postgres-compatible databases and even JavaScript. This allows Spark to serve as a universal data (pre-)processing platform for systems that require fast query turnaround times, e.g., portals & dashboards.
https://github.com/swoop-inc/spark-alchemy/wiki/Spark-HyperLogLog-Functions#spark-alchemys-approach-to-approximate-counting

It doesn't look like you are still using aggregateknowledge/java-hll, but rather migrated to com.clearspring.analytics:stream (Spark's dependency). As far as I can see this one isn't using the same storage spec and isn't compatible for that reason. Right?

I quite liked the idea of quickly reaggregating data in a more performant data store.
Sounds like you've moved onto a different approach?

Cheers,
Moritz

AbstractMethodError with Spark Scala integration

Getting error as per below

Exception in thread "main" java.lang.AbstractMethodError: com.swoop.alchemy.spark.expressions.hll.HyperLogLogInitSimpleAgg.inputTypes()Lscala/collection/Seq;
at org.apache.spark.sql.catalyst.expressions.ExpectsInputTypes$class.checkInputDataTypes(ExpectsInputTypes.scala:44)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.checkInputDataTypes(interfaces.scala:158)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:69)
at scala.collection.immutable.List.forall(List.scala:83)
at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:132)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:144)
at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:69)
at scala.collection.immutable.List.forall(List.scala:83)
at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:144)
at org.apache.spark.sql.catalyst.expressions.Alias.resolved$lzycompute(namedExpressions.scala:142)
at org.apache.spark.sql.catalyst.expressions.Alias.resolved(namedExpressions.scala:142)
at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$resolved$4.apply(basicLogicalOperators.scala:502)
at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$resolved$4.apply(basicLogicalOperators.scala:502)
at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:79)
at scala.collection.immutable.Stream.exists(Stream.scala:188)

Code:
import org.apache.spark.sql.SparkSession
import com.swoop.alchemy.spark.expressions.hll.functions._
/**

  • Hello world!

*/
object App{

def main(args: Array[String]): Unit=
{
val path1 = new java.io.File("C:\mohit\views\hlldemo\user.csv").getCanonicalPath
val spark=SparkSession.builder().appName("demo").master("local").getOrCreate()

val df1 = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path1)
val df2=df1
    .groupBy("first_name").agg(hll_init_agg("user_id"))

df2.show()

}
}

Specifying custom precision

Hi,

I am not able to specify custom precision with below code and it errors out. Can some one please let me know the right way to pass custom precision? Please note i am using databricks for spark runtime.

import com.swoop.alchemy.spark.expressions.hll.functions._
val df1 = spark.table("hive_metastore.rwd_databricks.table_test")
df1.select("PATIENT_ID","CLAIM_ID","CODE").withColumn("patient_id_hll", hll_init("PATIENT_ID",0.02))  .select(hll_merge("patient_id_hll",0.02).as("patient_id_hll_m")).write.mode("overwrite").format("delta").saveAsTable("patient_id_hll_merge")

Getting below error when trying it spark sql and scala

java.lang.NoClassDefFoundError: Could not initialize class com.swoop.alchemy.spark.expressions.hll.AgKn$
at com.swoop.alchemy.spark.expressions.hll.HyperLogLogBase$.nameToImpl(HLLFunctions.scala:56)
at com.swoop.alchemy.spark.expressions.hll.HyperLogLogBase$.$anonfun$resolveImplementation$2(HLLFunctions.scala:39)
at scala.Option.map(Option.scala:146)
at com.swoop.alchemy.spark.expressions.hll.HyperLogLogBase$.resolveImplementation(HLLFunctions.scala:39)
at com.swoop.alchemy.spark.expressions.hll.HLLFunctions.hll_init_agg(HLLFunctions.scala:706)
at com.swoop.alchemy.spark.expressions.hll.HLLFunctions.hll_init_agg$(HLLFunctions.scala:705)
at com.swoop.alchemy.spark.expressions.hll.functions$.hll_init_agg(HLLFunctions.scala:653)
at com.swoop.alchemy.spark.expressions.hll.HLLFunctions.hll_init_agg(HLLFunctions.scala:710)
at com.swoop.alchemy.spark.expressions.hll.HLLFunctions.hll_init_agg$(HLLFunctions.scala:709)
at com.swoop.alchemy.spark.expressions.hll.functions$.hll_init_agg(HLLFunctions.scala:653)
... 58 elided

dependencies include a source jar, causing IHLLMetadata class not found error

Tried the scala example on a fresh EMR cluster, and getting this error:

java.lang.NoClassDefFoundError: net/agkn/hll/serialization/IHLLMetadata
  at com.swoop.alchemy.spark.expressions.hll.HyperLogLogBase$.$anonfun$resolveImplementation$1(HLLFunctions.scala:39)
  at scala.Option.flatMap(Option.scala:271)
  at com.swoop.alchemy.spark.expressions.hll.HyperLogLogBase$.resolveImplementation(HLLFunctions.scala:39)
  at com.swoop.alchemy.spark.expressions.hll.HLLFunctions.hll_init_agg(HLLFunctions.scala:725)
  at com.swoop.alchemy.spark.expressions.hll.HLLFunctions.hll_init_agg$(HLLFunctions.scala:724)
  at com.swoop.alchemy.spark.expressions.hll.functions$.hll_init_agg(HLLFunctions.scala:672)
  ... 66 elided
Caused by: java.lang.ClassNotFoundException: net.agkn.hll.serialization.IHLLMetadata

Steps to reproduce: - (note the sources in the downloaded jars) - hll-1.6.0-sources.jar

[hadoop@ip-10-0-65-130 ~]$ spark-shell --packages com.swoop:spark-alchemy_2.12:1.2.0
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
com.swoop#spark-alchemy_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-903c632d-c058-4f0e-86bd-73a28cf3e3dd;1.0
        confs: [default]
        found com.swoop#spark-alchemy_2.12;1.2.0 in central
        found net.agkn#hll;1.6.0 in central
        found it.unimi.dsi#fastutil;6.5.11 in central
downloading https://repo1.maven.org/maven2/com/swoop/spark-alchemy_2.12/1.2.0/spark-alchemy_2.12-1.2.0.jar ...
        [SUCCESSFUL ] com.swoop#spark-alchemy_2.12;1.2.0!spark-alchemy_2.12.jar (19ms)
downloading https://repo1.maven.org/maven2/net/agkn/hll/1.6.0/hll-1.6.0-sources.jar ...
        [SUCCESSFUL ] net.agkn#hll;1.6.0!hll.jar (6ms)
downloading https://repo1.maven.org/maven2/it/unimi/dsi/fastutil/6.5.11/fastutil-6.5.11.jar ...
        [SUCCESSFUL ] it.unimi.dsi#fastutil;6.5.11!fastutil.jar (571ms)

hll_cardinality always return 1 when running with pyspark

When I am running the hll_cardinality function with pyspark, this library always return 1.

Code example:

def test_process():
    spark = SparkSession.builder.master('local[1]') \
        .config('spark.jars.packages',
                'com.swoop:spark-alchemy_2.12:1.1.0,') \
        .getOrCreate()

    sc = SparkContext.getOrCreate()
    sc._jvm.com.swoop.alchemy.spark.expressions.hll.HLLFunctionRegistration.registerFunctions(spark._jsparkSession)

    df = spark.range(5).toDF("id").select(expr("hll_cardinality(hll_init(id)) as cd"))
    df.show(truncate=False)

will display:

+---+
|cd |
+---+
|1  |
|1  |
|1  |
|1  |
|1  |
+---+

Env:

spark: 3.1.2
pyspark: 3.1.2
scala: 2.12.10

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.