Git Product home page Git Product logo

singlestore-spark-connector's Issues

Failure for writing large dataframe to memsql from spark

I am using this memsql-spark connector for writing dataframe from spark to memsql. This library works fine when the size of dataframe is not large. From number of lines aspect, if the dataframe has rows smaller than several millions, then it works. However, when the dataframe has more than 10 million rows, then the program may fail, if it has rows as large as billions, then it will definitely fails.

I am using spark and memsql on a single large server, so memsql master node and spark driver node are same.

The main error in the log is : java.sql.SQLException: Unable to connect to leaf @10.1.10.40:3307 with user root, using password NO: [2005] Timed out trying to connect to '10.1.10.40':3307 after 30 seconds.

This issue bothers me long time, I have updated memsql and this connect library, yet still not solved the problem.

Exception caught when connecting to the memsql cluster

Hi,

first of all, I'd like to say this project is really interesting. But, unfortunately I haven't found a consolidated documentation across the Internet related with the project and other examples or reported use cases.

I'm facing with I believe be trivial, that's basically trying to connect to the memsql cluster I had set up and then load the data I want to my spark app. Basically I just asked at stack overflow about this scenario, and I found interesting share that link here as an opportunity to us discuss where exactly could be the potencial point of error. So the link of my question and scenario of error is at http://stackoverflow.com/questions/34972034/error-using-memsql-spark-connector

I'd appreciate hear from the ppl who is actively coding/participating of the project.
Thank you.

Single RDD partition

although I'm configuring database name when loading I'm keeping getting single RDD partition
ss.read.format("com.memsql.spark.connector").options(Map("path" -> ("table"),"database" -> ("metrics"))).load()

db is configured with 32 partitions:
memsql> show partitions on metrics; +---------+----------------+------+--------+--------+ | Ordinal | Host | Port | Role | Locked | +---------+----------------+------+--------+--------+ | 0 | 10.125.134.164 | 3307 | Master | 0 | | 1 | 10.125.134.164 | 3307 | Master | 0 | | 2 | 10.125.134.164 | 3307 | Master | 0 | | 3 | 10.125.134.164 | 3307 | Master | 0 | | 4 | 10.125.134.164 | 3307 | Master | 0 | | 5 | 10.125.134.164 | 3307 | Master | 0 | | 6 | 10.125.134.164 | 3307 | Master | 0 | | 7 | 10.125.134.164 | 3307 | Master | 0 | | 8 | 10.125.134.164 | 3307 | Master | 0 | | 9 | 10.125.134.164 | 3307 | Master | 0 | | 10 | 10.125.134.164 | 3307 | Master | 0 | | 11 | 10.125.134.164 | 3307 | Master | 0 | | 12 | 10.125.134.164 | 3307 | Master | 0 | | 13 | 10.125.134.164 | 3307 | Master | 0 | | 14 | 10.125.134.164 | 3307 | Master | 0 | | 15 | 10.125.134.164 | 3307 | Master | 0 | | 16 | 10.125.134.164 | 3307 | Master | 0 | | 17 | 10.125.134.164 | 3307 | Master | 0 | | 18 | 10.125.134.164 | 3307 | Master | 0 | | 19 | 10.125.134.164 | 3307 | Master | 0 | | 20 | 10.125.134.164 | 3307 | Master | 0 | | 21 | 10.125.134.164 | 3307 | Master | 0 | | 22 | 10.125.134.164 | 3307 | Master | 0 | | 23 | 10.125.134.164 | 3307 | Master | 0 | | 24 | 10.125.134.164 | 3307 | Master | 0 | | 25 | 10.125.134.164 | 3307 | Master | 0 | | 26 | 10.125.134.164 | 3307 | Master | 0 | | 27 | 10.125.134.164 | 3307 | Master | 0 | | 28 | 10.125.134.164 | 3307 | Master | 0 | | 29 | 10.125.134.164 | 3307 | Master | 0 | | 30 | 10.125.134.164 | 3307 | Master | 0 | | 31 | 10.125.134.164 | 3307 | Master | 0 | +---------+----------------+------+--------+--------+ 32 rows in set (0.00 sec)

load data in upsert mode

when looking in the code here

I saw that upsert mode is not supported yet.
is there a plan to support sometime?

Thanks

Getting Mysql "Multiple statements detected in a single query." error.

Seems mysql driver is in classpath of provided spark sql package on our cluster. this one gets priority and the error below is a result. Technically not a bug in this connector, but not sure how to best workaround it.

2020-02-28 18:39:55.460 ERROR [Driver] util.SimpleWorkHourExporter$: Failed to run  job:
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Multiple statements detected in a single query. SET SESSION collation_server=utf8_general_ci;sql_select_limit=18446744073709551615;compile_only=false;sql_mode='STRICT_ALL_TABLES,ONLY_FULL_GROUP_BY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
	at com.mysql.jdbc.Util.getInstance(Util.java:387)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2547)
	at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1541)
	at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2605)
	at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1469)
	at com.mysql.jdbc.ConnectionImpl.setSessionVariables(ConnectionImpl.java:5092)
	at com.mysql.jdbc.ConnectionImpl.initializePropsFromServer(ConnectionImpl.java:3262)
	at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2299)
	at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2085)
	at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:795)
	at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
	at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:400)
	at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:327)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
	at com.memsql.spark.JdbcHelpers$.prepareTableForWrite(JdbcHelpers.scala:189)
	at com.memsql.spark.DefaultSource.createRelation(DefaultSource.scala:64)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
	at com.hulu.bda.memsql.util.HiveMemsqlUtil$.saveToTable(HiveMemsqlUtil.scala:79)

MemSQL Interface on CDH Spark over Yarn

Hi,

I'm trying to run memsql-spark-interface-1.3.0 on a CDH 5.1 Spark cluster, in yarn (client) mode.

For context, I can run spark-shell, and I can include memsql interface jar and play with it. That works fine.

spark-shell --master yarn --jars memsql-spark-interface-1.3.0.jar

If I try to run the application, though, I get this error:

spark-submit --master yarn memsql-spark-interface-1.3.0.jar --port 10009 --dataDir /tmp/memsql/ --dbHost $MASTER_IP

16/03/07 21:49:07 INFO Submitted application application_1456615046564_0024
16/03/07 21:49:15 INFO Connected to a Spark master on yarn-client
Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-4] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark-interface]
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[ERROR] [03/07/2016 21:49:15.509] [spark-interface-akka.actor.default-dispatcher-4] [ActorSystem(spark-interface)] Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-4] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/07/2016 21:49:15.557] [spark-interface-akka.actor.default-dispatcher-3] [akka://spark-interface/user/IO-HTTP] head of empty list
java.util.NoSuchElementException: head of empty list
    at scala.collection.immutable.Nil$.head(List.scala:337)
    at scala.collection.immutable.Nil$.head(List.scala:334)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/07/2016 21:49:15.561] [spark-interface-akka.actor.default-dispatcher-3] [akka://spark-interface/user/IO-HTTP] changing Recreate into Create after java.util.NoSuchElementException: head of empty list
Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark-interface]
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:135)
    at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:129)
    at akka.actor.ActorCell.faultCreate(ActorCell.scala:338)
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:58)
[ERROR] [03/07/2016 21:49:15.564] [spark-interface-akka.actor.default-dispatcher-3] [ActorSystem(spark-interface)] Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:135)
    at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:129)
    at akka.actor.ActorCell.faultCreate(ActorCell.scala:338)
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:58)
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:428)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:338)

    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:428)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have the feeling this is due to CDH, but I wanted to double check if memsql interface can work in yarn mode.

Googling around, I've found this similar issue, in a totally unrelated project:
FRosner/spawncamping-dds#47

Thanks, E.

MemSQLConnectionPool connect(info: MemSQLConnectionInfo) always invokes a connection

In testing I see the connect() always invokes a connection since it is not properly checking on the key but does not add it to the pool since there is a check pools.putIfAbsent()

In MemSQLConnectionPool.scala
def connect(info: MemSQLConnectionInfo): Connection = {
if (!pools.contains(info)) {

should be changed to

def connect(info: MemSQLConnectionInfo): Connection = {
if (!pools.containsKey(info)) {
Let me know if you want me to submit a PR, thanks.

load balancing for data ingestion is not working with multi aggregators

even after setting right config as per doc, the ingestion load from spark connector is only going to one aggregator only from spark connector.

config set:
spark.conf.set("spark.datasource.memsql.ddlEndpoint", "memsql-master.cluster.internal")
spark.conf.set("spark.datasource.memsql.dmlEndpoints", "memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307")

ran job multiple times and all the load infile commands are going to same aggregator.

While Executing the spark job on EMR running into No Suitable Driver found issue

Below is my code snippet:

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object SSPInventoryLoaderJob {

val logger = LoggerFactory.getLogger(getClass)

def main(args: Array[String]) {
// create Spark context with Spark configuration
val sc = new SparkContext(new SparkConf().setAppName("Spark Count").setMaster("local[*]"))
val sql = new SQLContext(sc)
val host = "xx.xx.xx.xx"
val port = 3306
val dbName = "xxxx"
val user = "xxxx"
val password = "xxxx"
val tableName = "xxx"

val conf = ConfigFactory.load
sql.sparkContext.hadoopConfiguration.set(Constants.S3_ACCESSKEY, conf.getString(Constants.SPARK_ETL_S3_ACCESSKEY))
sql.sparkContext.hadoopConfiguration.set(Constants.S3_SECRETKEY, conf.getString(Constants.SPARK_ETL_S3_SECRETKEY))
val start: Long = System.currentTimeMillis
val dfSSPInventoryList: DataFrame = sql.read
  .format(conf.getString(Constants.REDSHIFT_DRIVER))
  .option(Constants.URL, conf.getString(Constants.JDBC_URL))
  .option(Constants.QUERY, conf.getString(Constants.SSP_INVENTORY_LOADER_JOB_QUERY))
  .option(Constants.TEMP_DIR, conf.getString(Constants.S3_TMP_DIRECTORY) + "/ssp_inventory/")
  .load()
  val func_rt_acc = new com.memsql.spark.connector.DataFrameFunctions(dfSSPInventoryList)
func_rt_acc.saveToMemSQL(dbName, tableName, host, port, user, password)

}
}

Error:
Exception in thread "main" java.sql.SQLException: No suitable driver found for jdbc:mysql://xx.xx.xx.xxxx:3306
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at com.memsql.spark.context.MemSQLContext$.getMemSQLConnection(MemSQLContext.scala:41)
at com.memsql.spark.context.MemSQLContext$.getMemSQLChildAggregators(MemSQLContext.scala:49)
at com.memsql.spark.context.MemSQLContext$.getMemSQLNodesAvailableForIngest(MemSQLContext.scala:101)
at com.memsql.spark.connector.RDDFunctions.saveToMemSQL(RDDFunctions.scala:86)
at com.memsql.spark.connector.DataFrameFunctions.saveToMemSQL(DataFrameFunctions.scala:65)
at com.cadreon.atv.ranker.SSPInventoryLoaderJob$.main(SSPInventoryLoaderJob.scala:39)
at com.cadreon.atv.ranker.SSPInventoryLoaderJob.main(SSPInventoryLoaderJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

saveToMemSQLApp.scala question

Hi,

I'm trying to run the example code in WriteToMemSQLApp.scala.
I'm running it through spark-shell, as a yarn-client (my Spark version is 1.3.1).

When I run :
val rdd = sc.parallelize(values)
rdd.saveToMemSQL(dbName, outputTableName, host, port, user, password)
I get an error because saveToMemsql is not a method of rdd.

It seems to me that sc is a org.apache.spark.SparkContext object and that sc.parallelize() returns an org.apache.spark.rdd.RDD object which has no saveToMemsql method.

Where am I wrong ?

Thanks a lot for any hint,
Kimchitsigai

SaveMode configuration not honoured while saving data using DataFrameReader API

All the following code appends the data to the table. It doesn't seem to honour the save mode specified

dataframe.write.format("com.memsql.spark.connector").mode(SaveMode.ErrorIfExists).save("test.tablename")
dataframe.write.format("com.memsql.spark.connector").mode(SaveMode.Overwrite).save("test.tablename")
dataframe.write.format("com.memsql.spark.connector").mode(SaveMode.Append).save("test.tablename")

load data to memsql table from Hadoop (HDFS) ORC files

Hello Team,
I have a external HIVE table with billions of records in Hive and the underlying files are ORC format for this table. Is there a way I can use any memsql pipeline concept or any other memsql approach I can read data from HDFS ORC files and load to memsql table ?

we have memsql and hadoop on two different machine in the same network.

We tried using memsql-spark connector, but data loads very slow. Trying other options to load the data quickly.

getting error while loading table into spark dataframe

i am using spark 2.1.0 on linux centos
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)

val movies = spark.read.format("com.memsql.spark.connector").options(Map("query" -> ("select * from movies limit 10"),"database" -> "movielens")).load()

java.lang.ClassNotFoundException: org.apache.spark.Logging was removed in Spark 2.0. Please check if your library is compatible with Spark 2.0
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:580)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
... 48 elided
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:554)
... 53 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 86 more

movie table in movielens database:
CREATE TABLE movies (
id int(11) DEFAULT NULL,
title varchar(300) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
genres varchar(300) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL
/*!90618 , SHARD KEY () */
)

SingleStore DB support Array<struct> column types ?

Hello Team,
Like hive, does MEMSQL support table column datatype array as show below ?
if yes, how to extract array inner columns ? explode lateral view we have in hive, anything similar ?

column name | Datatype

diagnoses | array<struct<column1 string, column2 double>>

avro Date format exception

When I switch to loadDataFormat=Avro

I get an error when I have DataFrame with Date column

org.apache.avro.AvroRuntimeException: Unknown datum type java.sql.Date:

thanks,
Aleks

V3 is too slow as compared to V2

we were using Spark connector V2 from quite some time and had it benchmarked, we recently tried using V3 connector and surprisingly its 2X-3X slower than V2:

V2 :

<dependency>
    <groupId>com.memsql</groupId>
    <artifactId>memsql-connector_2.11</artifactId>
    <version>2.0.6</version>
</dependency>

V3:

<dependency>
    <groupId>com.memsql</groupId>
    <artifactId>memsql-spark-connector_2.11</artifactId>
    <version>3.0.4-spark-2.3.4</version>
</dependency>

Some stats:
Data Ingestion Size 300GB / 2.2 B records

RDD Partitions Memsql Connector V2 Total Ingestion Time in minutes Memsql Connector V3 Total ingestion time in mins DB Partitions Comment
32 135 190 total 32  
32 65 175  total 32

Spark config used was same for both connectors.

spark.speculation=false
spark.driver.memory=30G
spark.executor.memory=25G
spark.executor.instances=32
spark.executor.cores=8
spark.yarn.maxAppAttempts=1
spark.memory.storageFraction=0.2
spark.memory.fraction=0.85

java.sql.SQLException: Communications link failure when trying to load data

Hello,

I run Spark 2.2.0 on a Databricks cluster with 1 master and 1 slave. My memsql-connector version is 2.0.2.

I setup my conf

val memsql_host = sc.getConf.get("spark.memsql.host")
val memsql_port = sc.getConf.get("spark.memsql.port")
val memsql_user = sc.getConf.get("spark.memsql.user")
val memsql_password = sc.getConf.get("spark.memsql.password")
val memsql_defaultDatabase = sc.getConf.get("spark.memsql.defaultDatabase")

... and try to connect over jdbc

val pc_conn_str = s"jdbc:mysql://$memsql_host:$memsql_port/$memsql_defaultDatabase"
val pc_conn = DriverManager.getConnection(pc_conn_str, memsql_user, memsql_password)
val pc_stmt = pc_conn.createStatement
val pc_query ="SELECT COUNT(1) as part_count FROM INFORMATION_SCHEMA.TABLE_STATISTICS WHERE TABLE_NAME = 'job_matrix';"
val pc_rs = pc_stmt.executeQuery(pc_query);
 
var part_count = 2

while (pc_rs.next())
{
  part_count =  pc_rs.getInt("part_count")
}
pc_stmt.close()

... I succeed and get a result for part_count.

When I try to do the same with the memsql-connector

val tbl_job_matrix = sqlContext
  .read
  .format("com.memsql.spark.connector")
  .options(Map("query" -> ("SELECT COUNT(1) as part_count FROM INFORMATION_SCHEMA.TABLE_STATISTICS WHERE TABLE_NAME = 'job_matrix'"),
				 "database" -> "job_matrix"))
  .load()

... I get java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

Clearly it's not a an access issue - which is what documentation points me to. Can you suggest a fix or something to check? Also is there a way to setup a timeout on the connection since it takes quite some time until I get the failure?

Thank you!

New Spark version >= 2.0.0 doesn't have dataframes, these are replaced with dataset. So df.saveToMemSQL Not working.

New Spark version >= 2.0.0 doesn't have dataframes, these are replaced with dataset.
So df.saveToMemSQL Not working.

Exception in thread "main" java.lang.NoSuchMethodError: com.memsql.spark.connector.package$.DataFrameFunctions(Lorg/apache/spark/sql/Dataset;)Lcom/memsql/spark/connector/package$DataFrameFunctions;
at com.olx.guru.KafkaWordCount$.main(KafkaWordCount.scala:130)
at com.olx.guru.KafkaWordCount.main(KafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

insert strategy generate bad sql query

I trying to so some sort of upsert and thus i'm using the insert strategy and not load strategy.

when debugging the code I saw that the query generated from "build query" method
(memsql-spark-connector/connectorLib/src/main/scala/com/memsql/spark/connector/sql/InsertQuery.scala)

looks like:

INSERT INTO table (user_level,user_id,dwh_update_ts) VALUES (?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE user_id = values(user_id) , user_level = values(user_level) , dwh_update_ts = values(dwh_update_ts)

thus doesn't insert/update anything on my table

my code is as follows(scala):

def partialWriteToTarget(df:DataFrame,fieldsToUpdate:List[String],KeyField:String):Unit ={
val saveToMemSQLConf = SaveToMemSQLConf.apply(memsqlConf=this.memSQLConf.get,params=Map("onDuplicateKeySQL"-> createUpdateStatement(fieldsToUpdate)))
val tableIdentifier = TableIdentifier("bi_user_state",table_name)
df.saveToMemSQL(tableIdentifier,saveToMemSQLConf)

}
def createUpdateStatement(fields:List[String]):String={
fields.map(x=>s" $x = values($x) ,").mkString.dropRight(2)
}

Kafka topic with many partitions (on Yarn)

Hi, I think I've found an issue reading from Kafka when there are too many partitions, specifically when the number of partitions is greater than spark.port.maxRetries (default 16).

Note that I'm using a CDH Spark cluster in yarn-client mode, so I'm not sure this also repros with MemSQL Spark distro.

In summary, when a container starts it tries to bind to a port starting from an initial port, and fails after spark.port.maxRetries. If the Kafka topic has n partitions, then at least n containers are created, and if n > spark.port.maxRetries the available ports are saturated and eventually containers will fail to start.

I think in these cases it would be better to shut down the pipeline (like a fatal error), such that other pipelines can run without issues.

What happens now is that the exceptions are thrown but "ignored" and data is loaded normally. However I'm not sure if all data is loaded in all cases, as creating a test is kind of laborious... but I'd expect that if you have data in all partitions, some would fail.

Interface logs:

16/04/25 16:24:38 ERROR Lost executor 9 on <hostname.redacted>: Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/04/25 16:24:38 WARN Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/04/25 16:24:38 WARN Lost task 7.0 in stage 0.0 (TID 12, <hostname.redacted>): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/04/25 16:24:38 ERROR Listener SQLListener threw an exception
java.lang.NullPointerException
    at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLListener.scala:167)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
16/04/25 16:24:39 ERROR Lost executor 27 on <hostname.redacted>: Container marked as failed: container_1461362160503_0004_01_000030 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.

Container logs from:
http://...:8042/node/containerlogs/container_1461362160503_0004_01_000010/stderr/stderr/?start=0

16/04/25 16:24:30 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 34377@<hostname.redacted>
16/04/25 16:24:30 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
16/04/25 16:24:32 INFO spark.SecurityManager: Changing view acls to: yarn,emanuele
16/04/25 16:24:32 INFO spark.SecurityManager: Changing modify acls to: yarn,emanuele
16/04/25 16:24:32 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, emanuele); users with modify permissions: Set(yarn, emanuele)
16/04/25 16:24:33 INFO spark.SecurityManager: Changing view acls to: yarn,emanuele
16/04/25 16:24:33 INFO spark.SecurityManager: Changing modify acls to: yarn,emanuele
16/04/25 16:24:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, emanuele); users with modify permissions: Set(yarn, emanuele)
16/04/25 16:24:34 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/04/25 16:24:34 INFO Remoting: Starting remoting
16/04/25 16:24:34 WARN util.Utils: Service 'sparkExecutorActorSystem' could not bind on port 10014. Attempting port 10015.
16/04/25 16:24:34 ERROR Remoting: Remoting error: [Startup failed] [
akka.remote.RemoteTransportException: Startup failed
    at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
    at akka.remote.Remoting.start(Remoting.scala:194)
    at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
    at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
    at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
    at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
    at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
    at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
    at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
    at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1989)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1980)
    at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
    at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:217)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:183)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:148)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:250)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: <hostname.redacted>/10.19.192.29:10014
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
    at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:391)
    at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:388)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Success.map(Try.scala:206)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:444)
    at sun.nio.ch.Net.bind(Net.java:436)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
    at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
]

[...]

16/04/25 16:24:35 INFO Remoting: Starting remoting
16/04/25 16:24:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@<hostname.redacted>:10025]
16/04/25 16:24:35 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutorActorSystem@<hostname.redacted>:10025]
16/04/25 16:24:35 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 10025.
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_1/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-20ef5072-5398-4e44-b38d-710038d26c7f
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_2/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-f7e45ba1-1e24-49fb-a475-74057513f119
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_3/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-d8bdba6b-4296-49fd-ada2-b66b9a379b1c
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_4/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-947725bd-a63a-4f23-ae19-05a0a1722b40
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_5/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-c8d2b2cf-de63-42f5-a2e0-7390066f65ba
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_6/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-3bffbf82-2b57-4f5f-9e60-2a9bfec07120
16/04/25 16:24:36 INFO storage.MemoryStore: MemoryStore started with capacity 530.3 MB
16/04/25 16:24:36 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://[email protected]:10013
16/04/25 16:24:36 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
16/04/25 16:24:36 INFO executor.Executor: Starting executor ID 9 on host <hostname.redacted>
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10011. Attempting port 10012.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10012. Attempting port 10013.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10013. Attempting port 10014.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10014. Attempting port 10015.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10015. Attempting port 10016.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10016. Attempting port 10017.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10017. Attempting port 10018.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10018. Attempting port 10019.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10019. Attempting port 10020.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10020. Attempting port 10021.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10021. Attempting port 10022.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10022. Attempting port 10023.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10023. Attempting port 10024.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10024. Attempting port 10025.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10025. Attempting port 10026.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10026. Attempting port 10027.
16/04/25 16:24:36 ERROR netty.Inbox: Ignoring error
java.net.BindException: Address already in use: Service 'org.apache.spark.network.netty.NettyBlockTransferService' failed after 16 retries!
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:444)
    at sun.nio.ch.Net.bind(Net.java:436)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1089)
    at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:430)
    at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:415)
    at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:903)
    at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
    at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/04/25 16:24:36 ERROR executor.CoarseGrainedExecutorBackend: Received LaunchTask command but executor was null
16/04/25 16:24:36 INFO storage.DiskBlockManager: Shutdown hook called
16/04/25 16:24:36 INFO util.ShutdownHookManager: Shutdown hook called

memsql connector supports keytab and principal for kerb authn

Hi, I'm not sure if this is the right place to raise a feature request. I was wondering if we can add support for keytab and principal fields similar to spark jdbc reader which would load Krb5ConnectorContext with keytab and principal configuration. I can supply a custom jaas.conf in spark but would be nice if this can be supported as a spark option.

control max number of concurrent connections on save

Hello ,

is there a way we can control max number of concurrent connection from job side on save action?

the use case is following:

the computation that we are doing is pretty intensive, but the number of output rows is not so big.
now we are able to increase the parallelism in spark cluster increasing minNumPostShufflePartitions.

but on the other side I see a lot of pending connection to memsql now:

LOAD DATA LOCAL INFILE '###.lz4' INTO TABLE....
...

now, in theory we can create a user queue in memsql cluster, but i was trying to see if there is way to control this from connector side.

i tried increasing insertBatchSize to some big number but that didn't help.

thanks,
Aleks

cant add JSON field from within DataFrame.withColumn call

Hello

I have a Map field inside DataFrame which i need to save as JSON to MemSQL. When i do conversion to JsonValue, i got following error

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.spark.unsafe.types.UTF8String

Here is code example which reproduces the problem:

https://gist.github.com/akonopko/495f2ece5398700e000ec63dafb61c2b

I was able to fix this with a workaround:

https://gist.github.com/akonopko/af48c91170fa8f344bcb63aa31b82037

selective pushdown

Hi,

Is there a way we can do a selective pushdown ?
For example, can we specify the list of potential query types that can be considered for pushdown logic ?
in some cases we want to pushdown only filtering and do the joins , or rank functions on spark side.

Thanks,
Aleks

update only mode

Hey,
is there an option to only update a table ? with data coming from a stream
in saveToMemSql there are 2 available strategies :

  1. insert
  2. load

Thanks,
Lior

GROUP BY is not working when IN is added in WHERE clause

Hi there is a difference when i run queries from console(mysql client) and using spark-memsql-connector
using connector it is not grouping data.
Query:

SELECT
  dimension,
  feature_id,
  sub_pipeline,
  sum(click_in) AS click_in,
  sum(click_out) AS click_out,
  sum(income) AS income
FROM gravity_test.counter_daily cs
WHERE sub_pipeline in ('1','2') AND cs.c_date >= '2007-03-03' AND cs.c_date < '2017-03-03'
GROUP BY dimension, sub_pipeline, feature_id;

is returning:
+---------------+----------+------------+--------+---------+------+
|dimension |feature_id|sub_pipeline|click_in|click_out|income|
+---------------+----------+------------+--------+---------+------+
|KW_TEXT |a |2 |10 |5 |10 |
|KW_TEXT |b |1 |30 |20 |30 |
|COUNTRY |GB |1 |10 |5 |10 |
|COUNTRY#KW_TEXT|US#b |1 |10 |5 |10 |
|COUNTRY |US |1 |10 |5 |10 |
|KW_TEXT |c |1 |15 |15 |20 |
|COUNTRY#KW_TEXT|US#a |1 |10 |5 |10 |
|KW_TEXT |a |1 |10 |5 |10 |
|KW_TEXT |d |1 |30 |20 |30 |
|KW_TEXT |c |1 |5 |15 |25 |
|COUNTRY#KW_TEXT|GB#a |1 |10 |5 |10 |
|KW_TEXT |c |2 |10 |5 |10 |
|KW_TEXT |c |1 |10 |5 |10 |
|KW_TEXT |a |1 |20 |10 |20 |
+---------------+----------+------------+--------+---------+------+
when i am using console it behaves correctly.

Implementing the same query using WHERE sub_pipeline_id = '1' works well in memsql-connector

SELECT
  dimension,
  feature_id,
  sub_pipeline,
  sum(click_in) AS click_in,
  sum(click_out) AS click_out,
  sum(income) AS income
 FROM gravity_test.counter_daily cs
 WHERE sub_pipeline = '1' AND cs.c_date >= '2007-03-03' AND cs.c_date < '2017-03-03'
 GROUP BY dimension, sub_pipeline, feature_id

for the schema:

DROP DATABASE IF EXISTS gravity_test;
CREATE DATABASE gravity_test;
DROP TABLE IF EXISTS counter_daily;
CREATE TABLE `counter_daily` (
  `sub_pipeline` bigint(20) NOT NULL,
  `dimension` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `feature_id` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `c_date` datetime NOT NULL,
  `click_in` bigint(20) NOT NULL DEFAULT 0,
  `click_out` bigint(20) NOT NULL DEFAULT 0,
  `income` decimal(10,0) NOT NULL DEFAULT 0,
  `income_count` bigint(20) NOT NULL DEFAULT 0,
  PRIMARY KEY (`sub_pipeline`,`dimension`,`feature_id`,`c_date`)
);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY', 'GB', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY', 'US', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY#KW_TEXT', 'GB#a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY#KW_TEXT', 'US#a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY#KW_TEXT', 'US#b', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'a', '2016-01-02 00:00:00.000000', 20, 10, 20, 2);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'b', '2016-01-01 00:00:00.000000', 30, 20, 30, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'c', '2017-01-01 00:00:00.000000', 5, 15, 25, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'c', '2017-01-11 00:00:00.000000', 15, 15, 20, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'c', '2017-03-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'd', '2017-03-01 00:00:00.000000', 30, 20, 30, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (2, 'KW_TEXT', 'a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (2, 'KW_TEXT', 'c', '2017-03-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (3, 'KW_TEXT', 'b', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);

Question on merge functionality

Hello,
In memsql-spark-connector, I see there is merge option which will happen based on primary key (PK) by using overwriteBehavior=merge option.

In my case, I do not have any primary key for my memsql table. Is there any other way we can do merge by using memsql-spark-connector ?

I am looking for below two functionalities, when I say merge:

  1. If record exists, I want to overwrite
  2. If record doesn't exists, then I want insert as new record.

Based on memsql documentation, I couldn't find any help regarding this question for merge without PK.

PipelineMonitor graceful stop

Hi,

With the new support for checkpoints it would be nice to have:

  1. a graceful stop of the pipelines
  2. a graceful stop/restart of the interface
    (In short to avoid duplicated messages when restarting.)

Step 2. seems more complex, it should take into account timeouts (if a pipeline doesn't stop) and personally I'd also need some sort of priority to stop/restart the pipelines in a particular order. So, I'm planning to manage that with an external controller.

On step 1, instead, I was looking at the code and it seems relatively doable.
Just to be clear, with "graceful stop" I mean that if the step started, then it has to complete.

If I'm not wrong, this is the cause of the immediate stop:
https://github.com/memsql/memsql-spark-connector/blob/master/interface/src/main/scala/com/memsql/spark/interface/PipelineMonitor.scala#L639
and in principle PipelineMonitor.stop() could just set isStopping=true, while cancelling the job and interrupting the thread could be moved after the step terminates here:
https://github.com/memsql/memsql-spark-connector/blob/master/interface/src/main/scala/com/memsql/spark/interface/PipelineMonitor.scala#L179

Ritual questions:

  • Am I missing something?
  • What do you think? Shippit? :)

Thanks, E.

[Help appreciated] Issue when writing to memsql

Hi,

I am trying to use memsql-spark-connector to write spark dataframe to memsql. However it gave me this error. I used group by to generate the Spark dataframe. Can anyone help? Thanks

java.lang.NullPointerException
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.createHashMap(HashAggregateExec.scala:311)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at com.memsql.spark.connector.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:52)
at java.lang.Thread.run(Thread.java:745)

Does not work with Spark 2.0

Hi,
I was using the connector with Spark 1.6.2 and it was working fine, once we upgraded it to Spark 2.0, it creates the table but doesn't write anything.

insert on duplicate key causing locking errors

I'm trying read data from kafka and insert it in memsql using insert on duplicate key option via saveToMemsql (see code below)

I'm running with 2 executors (YARN-CLIENT) and I'm getting the following error:
java.sql.SQLException (Leaf Error (memsql-leaf-02.p1:3306): Lock wait timeout exceeded; try restarting transaction)
is there something that can be done to avoid this kind of error:

MY CODE:

 import java.sql.{Connection, DriverManager}
  import kafka.serializer.StringDecoder
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.{Row, SQLContext}
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.kafka._
  import com.memsql.spark.connector.DataFrameFunctions
  import com.memsql.spark.connector.MemSQLConf
  import com.memsql.spark.connector.sql.TableIdentifier
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.{DataFrame, SaveMode}
  import org.apache.spark.sql.memsql.SaveToMemSQLConf
  import java.sql.{DriverManager, Statement}
  def RunUpdateQuery(values: Iterator[Row]):Unit = {
    // connect to the database named "mysql" on the localhost
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://memsql-ops-01.p1/bi_user_state"
    val username = "app_bi_user"
    val password = "Cc113355!"
    var result = scala.collection.mutable.Map.empty[Int,(Long,Int)]
     if(values == null)
    {
      println("It's null!!!!!")
    }
    // there's probably a better way to do this
    var connection: Connection = null
    try {
      // make the connection
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      for (record <- values) {
        var counter = 0
        // create the statement, and run the select query
        val statement = connection.createStatement()
        val resultSet = statement.executeUpdate(s"UPDATE dim_all_users_info_110916 set total_spins = ${record.get(1)} where user_id = ${record.get(0)}")
      }
    } catch {
      case e => e.printStackTrace
    }
    connection.close()
  }
  val conf = sc.getConf
  conf.set("memsql.host", "host")
  conf.set("memsql.port", 3306)
//  conf.set("memsql.defaultSaveMode", "CreateMode.DatabaseAndTable")
//  conf.set("memsql.defaultInsertBatchSize", "10000")
//  conf.set("memsql.defaultLoadDataCompression", "GZIP")
  conf.set("memsql.defaultCreateMode", "Skip")
  conf.set("memsql.defaultDatabase", "db")
  conf.set("memsql.user", "usr")
  conf.set("memsql.password", "pass")
  sc.stop
  val kafkaParams = Map[String, String]("metadata.broker.list" -> "kafaka-borker:9092")
  val topicsSet = "sm_session".split(",").toSet
//  val topicsSet = "topic_name".split(",").toSet
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(conf, Seconds(10))
  val sqlContext = new SQLContext(ssc.sparkContext)
//  val sqlContext = new org.apache.spark.sql.hive.HiveContext(ssc.sparkContext)
  val memsqlContext = new MemSQLContext(ssc.sparkContext)
     val directKafkaStream =  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)
//  directKafkaStream.map(x=>x._2).repartition(4).foreachRDD(x=>{ sqlContext.read.json(x).groupBy("userId").count().foreachPartition(RunUpdateQuery)})
  directKafkaStream.map(x=>x._2).repartition(2).foreachRDD(x=>{
    var memSQLConf = MemSQLConf.apply(conf)
   val saveToMemSQLConf  = SaveToMemSQLConf.apply(memsqlConf=memSQLConf,params=Map("onDuplicateKeySQL"-> "user_balance_coins=values(user_balance_coins)"))
    val tableIdentifier = TableIdentifier("bi_user_state","sm_user_profile_110916")
    val df = sqlContext.read.json(x).select("userId","userBalance","timestamp")
                                    .withColumnRenamed("userId","user_id")
                                    .withColumnRenamed("userBalance","user_balance_coins")
                                    .withColumnRenamed("timestamp","spin_ts")
    df.saveToMemSQL(tableIdentifier,saveToMemSQLConf)})
  //    .write.format("com.memsql.spark.connector").save("db.table")
  ssc.start`

NullPointerException while getting next window partition

Seen in both Spark 2.0.0 and Spark 2.1.0
Stack trace for 2.1.0:

java.lang.NullPointerException at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:341)
    at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391)
    at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.memsql.spark.connector.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:52)
    at java.lang.Thread.run(Thread.java:745)

Using the memsql-spark-connector, Spark is unable to fetch the next partition because of a NullPointerException for this line in WindowExec: TaskContext.get().taskMemoryManager(),. I can output this full dataset by writing a CSV file to S3 or HDFS, so it seems like it must be something with the memsql-spark-connector that is causing that error.

Data input is a single large (208 GB) parquet file with the following Spark SQL query modifying the data to be output:

   SELECT lessonSessionId AS id,
                activityId as activity_id,
                lessonId AS activity_item_id,
                componentId AS activity_component_id,
                studentId AS student_id,
                studentAcademicYear AS student_academic_year_id,
                academicYearId AS academic_year,
                subjectId AS subject,
                to_date(eventStart) AS start_date_id,
                (unix_timestamp(eventStart) - unix_timestamp(to_date(eventStart))) AS start_time_id,
                to_date(eventEnd) AS end_date_id,
                (unix_timestamp(eventEnd) - unix_timestamp(to_date(eventEnd))) AS end_time_id,
                durationSecs AS duration_seconds,
                durationSecsRaw AS duration_seconds_raw,
                loginSessionId AS session_id,
                sittingType AS sitting_type
            FROM (
                SELECT *, ROW_NUMBER() OVER (PARTITION BY lessonSessionId ORDER BY eventEnd DESC) col
                FROM prod_tot_prod_tot
                WHERE eventType != 'LESSON_SKIP'
            ) x
            WHERE x.col = 1

And this code outputting the data to MemSQL:

val conf = new SaveToMemSQLConf(SaveMode.Ignore, CreateMode.Skip, Option.empty, 10000, CompressionType.GZip, true, Seq.empty, Seq.empty, false)
sqlDf.saveToMemSQL(TableIdentifier.apply(db, table), conf)

About half of the data is loaded into MemSQL, but some of the tasks persistently fail with the above NPE.

SparkContext has been shutdown

In some weird cases, an exception in one pipeline may cause SparkContext to be shutdown.

(For example, I was able to get into that situation with a pipeline that repeatedly failed with kafka.common.OffsetOutOfRangeException... not really sure why SparkContext has been shutdown, but that happened.)

As a result, all other pipelines enter in a loop where each batch fails with:

java.lang.IllegalStateException: SparkContext has been shutdown

I think it could be useful to catch this exception, and possibly perform a nice restart? Not sure what else one can do if SparkContext has been shutdown...

TypeConversions doesn't honor unsigned smallint and need better error handling

One of our column in MemSQL database is defined as
start_video_pos smallint(5) unsigned DEFAULT NULL
TypeConversions treats unsigned smallint as spark's ShortType which causes an exception while reading data event though value is in the unsigned smallint range. We suspect it needs to take into account sign bit and use IntergerType to fit value

We use memsql-connector_2.11:2.0.5 and are getting reading error with following trace:

WARN TaskSetManager: Lost task 11.0 in stage 0.0 (TID 11, ......compute.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLDataException: Value '65535' is outside of valid range for type java.lang.Short
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:85)
at com.mysql.cj.jdbc.result.ResultSetImpl.getShort(ResultSetImpl.java:863)
at org.apache.commons.dbcp2.DelegatingResultSet.getShort(DelegatingResultSet.java:211)
at org.apache.commons.dbcp2.DelegatingResultSet.getShort(DelegatingResultSet.java:211)
at com.memsql.spark.connector.dataframe.TypeConversions$.GetJDBCValue(TypeConversions.scala:80)
at com.memsql.spark.connector.util.JDBCImplicits$ResultSetHelpers$$anonfun$toRow$1.apply(JDBCImplicits.scala:26)
at com.memsql.spark.connector.util.JDBCImplicits$ResultSetHelpers$$anonfun$toRow$1.apply(JDBCImplicits.scala:23)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at com.memsql.spark.connector.util.JDBCImplicits$ResultSetHelpers.toRow(JDBCImplicits.scala:23)
at com.memsql.spark.connector.MemSQLQueryRelation$$anonfun$1.apply(MemSQLRelation.scala:45)
at com.memsql.spark.connector.MemSQLQueryRelation$$anonfun$1.apply(MemSQLRelation.scala:45)
at com.memsql.spark.connector.rdd.MemSQLRDD$$anon$1.getNext(MemSQLRDD.scala:275)
at com.memsql.spark.connector.util.NextIterator.hasNext(NextIterator.scala:74)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:380)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Caused by: com.mysql.cj.core.exceptions.NumberOutOfRange: Value '65535' is outside of valid range for type java.lang.Short
at com.mysql.cj.core.io.IntegerBoundsEnforcer.createFromLong(IntegerBoundsEnforcer.java:50)
at com.mysql.cj.core.io.BaseDecoratingValueFactory.createFromLong(BaseDecoratingValueFactory.java:58)
at com.mysql.cj.core.io.MysqlTextValueDecoder.decodeUInt2(MysqlTextValueDecoder.java:189)
at com.mysql.cj.mysqla.result.AbstractResultsetRow.decodeAndCreateReturnValue(AbstractResultsetRow.java:84)
at com.mysql.cj.mysqla.result.AbstractResultsetRow.getValueFromBytes(AbstractResultsetRow.java:225)
at com.mysql.cj.mysqla.result.ByteArrayRow.getValue(ByteArrayRow.java:84)
at com.mysql.cj.jdbc.result.ResultSetImpl.getNonStringValueFromRow(ResultSetImpl.java:630)
... 34 more

It would be useful to add more logging in JDBCImplicits.toRow() to log which column causes the error instead of just propagating Mysql error

issue with number of partitions when we use spark.read

Hello Team,
Here is my use case with memsql spark connector READ operation:

  1. I have table in MEMSQL and I am reading this table using below memsql connector read operation
    val DF = spark.read.format("memsql").option("ddlEndpoint", "url value").option("user","username").option("Password", "pwd").load("memsql table name")

  2. Once I read it to spark dataframe, I used below spark write API to write memsql data into hive table.
    DF.write.mode("overwrite").format("orc").saveAsTable("hive tabl ename")

  3. In this case, in my cluster, I see only one executor and one task running actively even though I specified 20 cores and 120 executers.

Actual issue: is, spark.read is always creating DF as single partition and because of that, this job not able to use available cores and executers.

I tried DF.repartitions(50) and then applied write operation, but no luck..

How can we make memsql spark.read() API create multiple partitions of DF, so that our job will use cluster resources effectively ?

because of this issue, for 100mn records, it took 28 hours to read from memsql table and write it to HIVE external table.

Any solution ?

ALTER TABLE xxx RENAME TO issue with memsql spark connector

Hello Team,
We are trying to rename a table present in memsql database using memsql spark connector as shown below.
But we are not successful. Can you please verify below memsql spark connector code and advise ?

Code:

sparksession
.read
.format("memsql")
.option("ddlEndpoint", connectionURL)
.option("user",username)
.option("Password", password)
.options(Map("query" -> ("ALTER TABLE Table1 RENAME TO Table2"),"database" -> "MyDB"))
.load()

ERROR:

You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'alter table MyDB.Table1 rename to MyDb.Table2) AS q WHERE 1=0' at line 1

saveToMemSQL Error when run sql with GROUP BY

It's ok with

val sql =
    s"""
      | SELECT gender AS genderId, -1 AS ageId
      | FROM $tableName
    """.stripMargin

But when I add more field to calculate and group by.

val sql =
    s"""
      | SELECT gender AS genderId, -1 AS ageId,
      | CAST(0 AS LONG) AS impression, CAST(0 AS LONG) AS trueImpression, COUNT(*) AS click
      | FROM $tableName
      | GROUP BY gender
    """.stripMargin
  val result = sqlContext.sql(sql)
  result.saveToMemSQL("dev_output", "aggregate")

It throws error

INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
Exception in thread "main" com.memsql.spark.SaveToMemSQLException: SaveToMemSQLException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 172 in stage 1.0 failed 1 times, most recent failure: Lost task 172.0 in stage 1.0 (TID 174, localhost): org.apache.spark.SparkException: Internal error: release called on 33554432 bytes but task only has 0
    at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
    at org.apache.spark.unsafe.map.BytesToBytesMap.free(BytesToBytesMap.java:708)
    at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.free(UnsafeFixedWidthAggregationMap.java:234)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:678)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:76)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.memsql.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:53)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:85)
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:40)
    at job.WriteToMemSQL$delayedInit$body.apply(WriteToMemSQL.scala:105)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at job.WriteToMemSQL$.main(WriteToMemSQL.scala:17)
    at job.WriteToMemSQL.main(WriteToMemSQL.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

I am using spark 1.5.2 and memsql 1.3.2

Add support for Scala 2.11.7

First, thanks for the great project, just what I what I needed for a given use case.

Can the build.sbt be modify to add crossScalaVersions and set the scalaVersion := "2.11.7". I bumped the java version to 1.8 as I wanted to test the deploy locally, but that should not be necessary. Here is the diff from my system, will be glad to put together a PR if you feel this is worth doing.

RADTech-MBP:memsql-spark-connector tnist$ git diff build.sbt
diff --git a/build.sbt b/build.sbt
index 0c2e410..e5b401e 100644
--- a/build.sbt
+++ b/build.sbt
@@ -14,7 +14,8 @@ lazy val testScalastyle = taskKey[Unit]("testScalastyle")
 lazy val commonSettings = Seq(
   organization := "com.memsql",
   version := "1.2.1-SNAPSHOT",
-  scalaVersion := "2.10.5",
+  scalaVersion := "2.11.7",
+  crossScalaVersions := Seq("2.10.4", "2.11.7"),
   assemblyScalastyle := org.scalastyle.sbt.ScalastylePlugin.scalastyle.in(Compile).toTask("").value,
   assembly <<= assembly dependsOn assemblyScalastyle,
   testScalastyle := org.scalastyle.sbt.ScalastylePlugin.scalastyle.in(Test).toTask("").value,
@@ -54,7 +55,7 @@ lazy val commonSettings = Seq(
   publishMavenStyle := true,
   publishArtifact in Test := false,
   pomIncludeRepository := { _ => false },
-  javaVersionPrefix in javaVersionCheck := Some("1.7")
+  javaVersionPrefix in javaVersionCheck := Some("1.8")
 )

 lazy val connectorLib = (project in file("connectorLib")).

-Todd

Spark MemSQL Connector looses connection after idle time

Example:

val df = memsqlContext.read.format("com.memsql.spark.connector").load("db.table")

5 minutes idle time...

val df = memsqlContext.read.format("com.memsql.spark.connector").load("db.table") ...fails

Doing the same using the standard spark jdbc dataframe api, it works.

registerTable problem

I have a MemSQL cluster. I tried connect to it and read some data from a Table but when I select my table the program throw this exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: Clienti;
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:306)

I tried different ways to do this:

object LoadTestSpark extends App {
  val conf = new SparkConf().setAppName("MemSQL Example")
  conf.set("memsql.host", "192.168.3.12")
  conf.setMaster("local[1]")
  val sparkContext = new SparkContext(conf)
  val memsqlContext = new MemSQLContext(sparkContext)
  memsqlContext.setDatabase("MyDB")
memsqlContext.sql("select * from Client").registerTempTable("client")
}
object LoadTestSpark extends App {
  val conf = new SparkConf().setAppName("MemSQL Example")
  conf.set("memsql.host", "192.168.3.12")
  conf.setMaster("local[1]")
  val sparkContext = new SparkContext(conf)
  val memsqlContext = new MemSQLContext(sparkContext)
  memsqlContext.setDatabase("MyDB")
  val client = memsqlContext.table("Client")
}

but in both cases it does not work.

If i run this:

...
memsqlContext.tableNames().foreach(println)
...

the output is:

...
`MyDB`.`Checkup_dati`
`MyDB`.`Chiamate_blocchi`
`MyDB`.`Cicli`
`MyDB`.`Classificazioni`
`MyDB`.`Client`
....

what can I do for load data from my tables?

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException

Not sure what's causing the error below. I'm reading some data from cassandra and trying to write it to memSQL. Any ideas how to resolve the issue?

Here is my code:

import com.memsql.spark.connector.MemSQLContext

val memsqlContext = new MemSQLContext(sc)
val df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "cloudscm" )).load()
df.registerTempTable("customers")
val memdf = sqlContext.sql("SELECT id, site, customergroup, name from customers")
memdf.write.format("com.memsql.spark.connector").save("test.customer")

I created the table in memsql using the following command:

create table customer(id varchar(256), site varchar(256), customergroup varchar(256), name varchar(256));

Here is the error I get:

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '())' at line 1
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
    at com.mysql.jdbc.Util.getInstance(Util.java:360)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:978)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2526)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:848)
    at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:742)
    at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:291)
    at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:291)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1$$anonfun$apply$8.apply(MemSQLCluster.scala:141)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1$$anonfun$apply$8.apply(MemSQLCluster.scala:140)
    at com.memsql.spark.connector.util.Loan.to(Loan.scala:6)
    at com.memsql.spark.connector.util.JDBCImplicits$ConnectionHelpers.withStatement(JDBCImplicits.scala:51)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1.apply(MemSQLCluster.scala:140)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1.apply(MemSQLCluster.scala:139)
    at com.memsql.spark.connector.util.Loan.to(Loan.scala:6)
    at com.memsql.spark.connector.MemSQLConnectionPool$.withConnection(MemSQLConnectionPool.scala:38)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$withMasterConn$1.apply(MemSQLCluster.scala:23)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$withMasterConn$1.apply(MemSQLCluster.scala:23)
    at com.memsql.spark.connector.MemSQLCluster.createTable(MemSQLCluster.scala:139)
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:70)
    at org.apache.spark.sql.memsql.MemSQLTableRelation.insert(MemSQLRelation.scala:45)
    at org.apache.spark.sql.memsql.DefaultSource.createRelation(DefaultSource.scala:33)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC.<init>(<console>:44)
    at $iwC.<init>(<console>:46)
    at <init>(<console>:48)
    at .<init>(<console>:52)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

zero date time behaviour connection property

Getting this error while saving a dataframe with df.write

The connection property 'zeroDateTimeBehavior' acceptable values are: 'CONVERT_TO_NULL', 'EXCEPTION' or 'ROUND'. The value 'convertToNull' is not acceptable

ReceiverInputDStream in MemSQL Streamliner

Have you ever investigated if a ReceiverInputDStream can work with MemSQL Streamliner like a "normal" InputDStream does?

I was trying to create a KafkaExtractor based on:
https://github.com/dibbhatt/kafka-spark-consumer
(mostly to get checkpointing to Zookeeper for free)

My code is very similar to the KafkaExtractor/ByteArrayExtractor, but fails with:

org.apache.spark.SparkException: org.apache.spark.streaming.dstream.PluggableInputDStream@2b57a5f5 has not been initialized

I tried running start() on the ReceiverInputDStream, running onStart() on its receiver, etc., but no luck.

I guess SparkStreamingContext.start() is doing a lot of stuff to make ReceiverInputDStreams work... I was wondering if you ever had experience with that.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.