Git Product home page Git Product logo

singlestore-spark-connector's Introduction

SingleStoreDB Spark Connector

Version: 4.1.8 License

Getting Started

You can find the latest version of the connector on Maven Central and spark-packages.org. The group is com.singlestore and the artifact is singlestore-spark-connector_2.11 for Spark 2 and singlestore-spark-connector_2.12 for Spark 3.

You can add the connector to your Spark application using: spark-shell, pyspark, or spark-submit

$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0

We release multiple versions of the singlestore-spark-connector, one for each supported Spark version. The connector follows the x.x.x-spark-y.y.y naming convention, where x.x.x represents the connector version and y.y.y represents the corresponding Spark version. For example, in connector 4.1.8-spark-3.5.0, 4.1.8 is the version of the connector, compiled and tested against Spark version 3.5.0. It is critical to select the connector version that corresponds to the Spark version in use.

Configuration

The singlestore-spark-connector is configurable globally via Spark options and locally when constructing a DataFrame. The options are named the same, however global options have the prefix spark.datasource.singlestore..

Basic options

Option Default value Description
ddlEndpoint (On-Premise deployment) (required) - The hostname or IP address of the SingleStoreDB Master Aggregator in the host[:port] format, where port is an optional parameter. Example: master-agg.foo.internal:3308 or master-agg.foo.internal.
dmlEndpoints (On-Premise deployment) ddlEndpoint The hostname or IP address of SingleStoreDB Aggregator nodes to run queries against in the host[:port],host[:port],... format, where :port is an optional parameter (multiple hosts separated by comma). Example: child-agg:3308,child-agg2.
clientEndpoint (Cloud deployment) (required) - The hostname or IP address to the SingleStoreDB Cloud workspace to run queries against in the format host[:port] (port is optional). Ex. svc-b093ff56-7d9e-499f-b970-7913852facc4-ddl.aws-oregon-2.svc.singlestore.com:3306
user root The SingleStoreDB username.
password - Password of the SingleStoreDB user.
query - The query to run (mutually exclusive with dbtable).
dbtable - The table to query (mutually exclusive with query).
database - If set, all connections use the specified database by default.

Read options

Option Default value Description
disablePushdown false Disable SQL Pushdown when running queries.
enableParallelRead automaticLite Enables reading data in parallel for some query shapes. It can have of the following values: disabled, automaticLite, automatic, and forced. For more information, see Parallel Read Support.
parallelRead.Features ReadFromAggregators,ReadFromAggregatorsMaterialized Specifies a comma separated list of parallel read features that are tried in the order they are listed. SingleStore supports the following features: ReadFromLeaves, ReadFromAggregators, and ReadFromAggregatorsMaterialized. Example: ReadFromAggregators,ReadFromAggregatorsMaterialized. For more information, see Parallel Read Support.
parallelRead.tableCreationTimeoutMS 0 Specifies the amount of time (in milliseconds) the reader waits for the result table creation when using the ReadFromAggregators feature. If set to 0, timeout is disabled.
parallelRead.materializedTableCreationTimeoutMS 0 Specifies the amount of time (in milliseconds) the reader waits for the result table creation when using the ReadFromAggregatorsMaterialized feature. If set to 0, timeout is disabled.
parallelRead.numPartitions 0 Specifies the exact number of partitions in the resulting DataFrame. If set to 0, value is ignored.
parallelRead.maxNumPartitions 0 Specifies the Maximum number of partitions in the resulting DataFrame. If set to 0, no limit is applied.
parallelRead.repartition false Repartition data before reading.
parallelRead.repartition.columns RAND() Specifies a comma separated list of columns that are used for repartitioning (when parallelRead.repartition is enabled). By default, an additional column with RAND() value is used for repartitioning.

Write options

Option Default value Description
overwriteBehavior dropAndCreate Specifies the behavior during Overwrite. It can have one of the following values: dropAndCreate, truncate, merge.
truncate false ⚠️ This option is deprecated, please use overwriteBehavior instead. Truncates instead of dropping an existing table during Overwrite.
loadDataCompression Gzip Compresses data on load. It can have one of the following three values: GZip, LZ4, and Skip.
loadDataFormat CSV Serializes data on load. It can have one of the following values: Avro or CSV.
tableKey - Specifies additional keys to add to tables created by the connector. See Specifying keys for tables created by the Spark Connector for more information.
onDuplicateKeySQL - If this option is specified and a new row with duplicate PRIMARY KEY or UNIQUE index is inserted, SingleStoreDB performs an UPDATE operation on the existing row. See Inserting rows into the table with ON DUPLICATE KEY UPDATE for more information.
insertBatchSize 10000 Specifies the size of the batch for row insertion.
maxErrors 0 The maximum number of errors in a single LOAD DATA request. When this limit is reached, the load fails. If this property is set to 0, no error limit exists.
createRowstoreTable rowstore If enabled, the connector creates a rowstore table.

Connection pool options

Option Default value Description
driverConnectionPool.Enabled true Enables using of connection pool on the driver. (default: true)
driverConnectionPool.MaxOpenConns -1 The maximum number of active connections with the same options that can be allocated from the driver pool at the same time, or negative for no limit. (default: -1)
driverConnectionPool.MaxIdleConns 8 The maximum number of connections with the same options that can remain idle in the driver pool, without extra ones being released, or negative for no limit. (default: 8)
driverConnectionPool.MinEvictableIdleTimeMs 30000 (30 sec) The minimum amount of time an object may sit idle in the driver pool before it is eligible for eviction by the idle object evictor (if any). (default: 30000 - 30 sec)
driverConnectionPool.TimeBetweenEvictionRunsMS 1000 (1 sec) The number of milliseconds to sleep between runs of the idle object evictor thread on the driver. When non-positive, no idle object evictor thread will be run. (default: 1000 - 1 sec)
driverConnectionPool.MaxWaitMS -1 The maximum number of milliseconds that the driver pool will wait (when there are no available connections) for a connection to be returned before throwing an exception, or -1 to wait indefinitely. (default: -1)
driverConnectionPool.MaxConnLifetimeMS -1 The maximum lifetime in milliseconds of a connection. After this time is exceeded the connection will fail the next activation, passivation, or validation test and won’t be returned by the driver pool. A value of zero or less means the connection has an infinite lifetime. (default: -1)
executorConnectionPool.Enabled true Enables using of connection pool on executors. (default: true)
executorConnectionPool.MaxOpenConns true The maximum number of active connections with the same options that can be allocated from the executor pool at the same time, or negative for no limit. (default: true)
executorConnectionPool.MaxIdleConns 8 The maximum number of connections with the same options that can remain idle in the executor pool, without extra ones being released, or negative for no limit. (default: 8)
executorConnectionPool.MinEvictableIdleTimeMs 2000 The minimum amount of time an object may sit idle in the executor pool before it is eligible for eviction by the idle object evictor (if any). (default: 2000 - 2 sec)
executorConnectionPool.TimeBetweenEvictionRunsMS 1000 The number of milliseconds to sleep between runs of the idle object evictor thread on the executor. When non-positive, no idle object evictor thread will be run. (default: 1000 - 1 sec)
executorConnectionPool.MaxWaitMS -1 The maximum number of milliseconds that the executor pool will wait (when there are no available connections) for a connection to be returned before throwing an exception, or -1 to wait indefinitely. (default: -1)
executorConnectionPool.MaxConnLifetimeMS -1 The maximum lifetime in milliseconds of a connection. After this time is exceeded the connection will fail the next activation, passivation, or validation test and won’t be returned by the executor pool. A value of zero or less means the connection has an infinite lifetime. (default: -1)

Examples

Configure singlestore-spark-connector for SingleStoreDB Cloud

The following example configures the singlestore-spark-connector globally:

spark.conf.set("spark.datasource.singlestore.clientEndpoint", "singlestore-host")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

The following example configures the singlestore-spark-connector using the read API:

val df = spark.read
    .format("singlestore")
    .option("clientEndpoint", "singlestore-host")
    .option("user", "admin")
    .load("foo")

The following example configures the singlestore-spark-connector using an external table in Spark SQL:

CREATE TABLE bar USING singlestore OPTIONS ('clientEndpoint'='singlestore-host','dbtable'='foo.bar')

note: singlestore-spark-connectordoesn't support writing to the reference table for SingleStoreDB Cloud note: singlestore-spark-connectordoesn't support read-only databases for SingleStoreDB Cloud

Configure singlestore-spark-connector for SingleStoreDB On-Premises

The following example configures the singlestore-spark-connector globally:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

The following example configures the singlestore-spark-connector using the read API:

val df = spark.read
    .format("singlestore")
    .option("ddlEndpoint", "singlestore-master.cluster.internal")
    .option("user", "admin")
    .load("foo")

The following example configures the singlestore-spark-connector using an external table in Spark SQL:

CREATE TABLE bar USING singlestore OPTIONS ('ddlEndpoint'='singlestore-master.cluster.internal','dbtable'='foo.bar')

For Java/Python versions of some of these examples, visit the section "Java & Python Example"

Writing to SingleStoreDB

The singlestore-spark-connector supports saving dataframes to SingleStoreDB using the Spark write API. Here is a basic example of using this API:

df.write
    .format("singlestore")
    .option("loadDataCompression", "LZ4")
    .option("overwriteBehavior", "dropAndCreate")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

If the target table ("foo" in the example above) does not exist in SingleStoreDB the singlestore-spark-connector will automatically attempt to create the table. If you specify SaveMode.Overwrite, if the target table already exists, it will be recreated or truncated before load. Specify overwriteBehavior = truncate to truncate rather than re-create.

Retrieving the number of written rows from taskMetrics

It is possible to add the listener and get the number of written rows.

spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    println("Task id: " + taskEnd.taskInfo.id.toString)
    println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString)
  }
})

df.write.format("singlestore").save("example")

Specifying keys for tables created by the Spark Connector

When creating a table, the singlestore-spark-connector will read options prefixed with tableKey. These options must be formatted in a specific way in order to correctly specify the keys.

⚠️ The default table type is a SingleStoreDB columnstore. To create a rowstore table instead, enable the createRowstoreTable option.

To explain we will refer to the following example:

df.write
    .format("singlestore")
    .option("tableKey.primary", "id")
    .option("tableKey.key.created_firstname", "created, firstName")
    .option("tableKey.unique", "username")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

In this example, we are creating three keys:

  1. A primary key on the id column
  2. A regular key on the combination of the firstname and created columns, with the key name created_firstname
  3. A unique key on the username column

Note on (2): Any key can optionally specify a name, just put it after the key type. Key names must be unique.

To change the default ColumnStore sort key you can specify it explicitly:

df.write
    .option("tableKey.columnstore", "id")

You can also customize the shard key like so:

df.write
    .option("tableKey.shard", "id, timestamp")

Inserting rows into the table with ON DUPLICATE KEY UPDATE

When updating a table it is possible to insert rows with ON DUPLICATE KEY UPDATE option. See sql reference for more details.

⚠️ This feature doesn't work for columnstore tables with SingleStoreDB 7.1.

df.write
    .option("onDuplicateKeySQL", "age = age + 1")
    .option("insertBatchSize", 300)
    .mode(SaveMode.Append)
    .save("foo.bar")

As a result of the following query, all new rows will be appended without changes. If a row with the same PRIMARY KEY or UNIQUE index already exists then the corresponding age value will be increased.

When you use ON DUPLICATE KEY UPDATE, all rows of the DataFrame are split into batches, and every insert query will contain no more than the specified insertBatchSize rows setting.

Save Modes

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic.

  1. SaveMode.Append means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
  2. SaveMode.Overwrite means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

Overwrite mode depends on overwriteBehavior option, for better understanding look at the section "Merging on save"

  1. SaveMode.ErrorIfExists means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
  2. SaveMode.Ignore means that when saving a DataFrame to a data source, if data already exists, contents of the DataFrame are expected to be appended to existing data and all rows with duplicate key are ignored.

Example of SaveMode option

df.write
    .mode(SaveMode.Append)
    .save("foo.bar")

Merging on save

When saving dataframes or datasets to SingleStoreDB, you can manage how SaveMode.Overwrite is interpreted by the connector via the option overwriteBehavior. This option can take one of the following values:

  1. dropAndCreate(default) - drop and create the table before writing new values.
  2. truncate - truncate the table before writing new values.
  3. merge - replace rows with new rows by matching on the primary key. (Use this option only if you need to fully rewrite existing rows with new ones. To specify some rule for the update, use the onDuplicateKeySQL option instead.)

All these options are case-insensitive.

Example of merge option

Suppose you have the following table, and the Id column is the primary key.

SELECT * FROM <table>;

Id Name Age
1 Alice 20
2 Bob 25
3 Charlie 30

If you save the following dataframe with overwriteBehavior = merge:

Id Name Age
2 Daniel 22
3 Eve 27
4 Franklin 35
df.write
    .format("singlestore")
    .option("overwriteBehavior", "merge")
    .mode(SaveMode.Overwrite)
    .save("<yourdb>.<table>")

After the save is complete, the table will look like this:

note: rows with Id=2 and Id=3 were overwritten with new rows
note: the row with Id=1 was not touched and still exists in the result

SELECT * FROM <table>;

Id Name Age
1 Alice 20
2 Daniel 22
3 Eve 27
4 Franklin 35

SQL Pushdown

The singlestore-spark-connector has extensive support for rewriting Spark SQL and dataframe operation query plans into standalone SingleStoreDB queries. This allows most of the computation to be pushed into the SingleStoreDB distributed system without any manual intervention. The SQL rewrites are enabled automatically, but they can be disabled using the disablePushdown option. The singlestore-spark-connector also support partial pushdown, where certain parts of a query can be evaluated in SingleStoreDB and certain parts can be evaluated in Spark.

⚠️ SQL Pushdown is either enabled or disabled on the entire Spark Session. If you want to run multiple queries in parallel with different values of disablePushdown, make sure to run them on separate Spark Sessions.

We currently support most of the primary Logical Plan nodes in Spark SQL including:

  • Project
  • Filter
  • Aggregate
  • Window
  • Join
  • Limit
  • Sort

We also support most Spark SQL expressions. A full list of supported operators/functions can be found in the ExpressionGen.scala file.

The best place to look for examples of fully supported queries is in the tests. Check out this file as a starting point: SQLPushdownTest.scala.

Debugging SQL Pushdown

If you encounter an issue with SQL Pushdown the first step is to look at the explain. You can do this easily from any dataframe using the function df.explain(). If you pass the argument true you will get a lot more output that includes pre and post optimization passes.

In addition, the singlestore-spark-connector outputs a lot of helpful information when the TRACE log level is enabled for the com.singlestore.spark package. To enable TRACE log level, add the following line(s) to the log4j configuration:

  • Log4j
log4j.logger.com.singlestore.spark=TRACE
  • Log4j 2
logger.singlestore.name = com.singlestore.spark
logger.singlestore.level = TRACE
logger.singlestore.additivity = false

Make sure not to leave it in place since it generates a huge amount of tracing output.

SQL Pushdown Incompatibilities

  • ToUnixTimestamp and UnixTimestamp only handle time values less than 2038-01-19 03:14:08, if they get DateType or TimestampType as a first argument.
  • FromUnixTime with yyyy-MM-dd HH:mm:ss as the default format, only handles time less than 2147483648 (2^31).
  • DecimalType is truncated on overflow (by default, Spark either throws an exception or returns null).
  • greatest and least return null if at least one argument is null (in Spark these functions skip nulls).
  • When a value can not be converted to numeric or fractional type SingleStoreDB returns 0 (Spark returns null).
  • Atanh(x), for x ∈ (-∞, -1] ∪ [1, ∞) returns, null (Spark returns NaN).
  • When a string is cast to a numeric type, SingleStoreDB takes the prefix of it which is numeric (Spark returns null if the whole string is not numeric).
  • When a numeric type is cast to a smaller one (in size), SingleStoreDB truncates it. For example 500 cast to the Byte will be 127. Note: Spark optimizer can optimize casts for literals and then the behaviour for literals matches custom Spark behaviour.
  • When a fractional type is cast to an integral type, SingleStoreDB rounds it to the closest value.
  • Log returns null instead of NaN, Infinity, -Infinity.
  • Round rounds down if the number to be rounded is followed by 5, and it is DOUBLE or FLOAT (DECIMAL is rounded up).
  • Conv works differently if the number contains non-alphanumeric characters.
  • ShiftLeft, ShiftRight, and ShiftRightUnsigned convert the value to an UNSIGNED BIGINT and then produce the shift. In case of an overflow, it returns 0 (1<<64 = 0 and 10>>20 = 0).
  • BitwiseGet returns 0 when the bit position is negative or exceeds the bit upper limit.
  • Initcap defines a letter as the beginning of a word even if it is enclosed in quotation marks, brackets, etc. For example "dear sir/madam (miss)" is converted to "Dear Sir/Madam (Miss)".
  • Skewness(x), in Spark 3.0, for STD(x) = 0 returns null instead of NaN.

Parallel Read Support

Parallel read can be enabled using enableParallelRead option. This can drastically improve performance in some cases.

The enableParallelRead option can have one of the following values:

  • disabled: Disables parallel reads and performs non-parallel reads.
  • automaticLite: Performs parallel reads if at least one parallel read feature specified in parallelRead.Features is supported. Otherwise performs a non-parallel read. In automaticLite mode, after push down of the outer sorting operation (for example, a nested SELECT statement where sorting is done in a top-level SELECT) into SingleStoreDB is done, a non-parallel read is used.
  • automatic: Performs parallel reads if at least one parallel read feature specified in parallelRead.Features is supported. Otherwise performs a non-parallel read. In automatic mode, the singlestore-spark-connector is unable to push down an outer sorting operation into SingleStore. Final sorting is done at the Spark end of the operation.
  • forced: Performs parallel reads if at least one parallel read feature specified in parallelRead.Features is supported. Otherwise it returns an error. In forced mode, the singlestore-spark-connector is unable to push down an outer sorting operation into SingleStore. Final sorting is done at the Spark end of the operation.

By default, enableParallelRead is set to automaticLite.

Parallel read features

The SingleStoreDB Spark Connector supports the following parallel read features:

  • readFromAggregators
  • readFromAggregatorsMaterialized
  • readFromLeaves

The connector uses the first feature specified in parallelRead.Features which meets all the requirements. The requirements for each feature are specified below. By default, the connector uses the readFromAggregators feature. You can repartition the result set for readFromAggregators and readFromAggregatorsMaterialized features. See Parallel Read Repartitioning for more information.

readFromAggregators

When this feature is used, the singlestore-spark-connector will use SingleStoreDB parallel read functionality. By default, the number of partitions in the resulting DataFrame is the least of the number of partitions in the SingleStoreDB database and Spark parallelism level (i.e., sum of (spark.executor.cores/spark.task.cpus) for all executors). Number of partitions in the resulting DataFrame can be controlled using parallelRead.maxNumPartitions and parallelRead.numPartitions options. To use this feature, all reading queries must start at the same time. Connector tries to retrieve maximum number of tasks that can be run concurrently and uses this value to distribute reading queries. In some cases, connector is not able to retrieve this value (for example, with AWS Glue). In such cases, parallelRead.numPartitions option is required.

Use the parallelRead.tableCreationTimeoutMS option to specify a timeout for result table creation.

Requirements:

  • SingleStoreDB version 7.5+
  • Either the database option is set, or the database name is specified in the load option
  • SingleStoreDB parallel read functionality supports the generated query
  • parallelRead.numPartitioins option is set, or connector is able to compute maximum number of concurrent tasks that can be run on Spark cluster

readFromAggregatorsMaterialized

This feature is very similar to readFromAggregators. The only difference is that readFromAggregatorsMaterialized uses the MATERIALIZED option to create the result table. When this feature is used, the reading tasks do not have to start at the same time. Hence, the parallelism level on the Spark cluster does not affect the reading tasks. Although, using the MATERIALIZED option may cause a query to fail if SingleStoreDB does not have enough memory to materialize the result set. By default, the number of partitions in the resulting DataFrame is equal to the number of partitions in the SingleStoreDB database. Number of partitions in the resulting DataFrame can be controlled using parallelRead.maxNumPartitions and parallelRead.numPartitions options.

Use the parallelRead.materializedTableCreationTimeoutMS option to specify a timeout for materialized result table creation.

Requirements:

  • SingleStoreDB version 7.5+
  • Either the database option is set, or the database name is specified in the load option
  • SingleStoreDB parallel read functionality supports the generated query

readFromLeaves

When this feature is used, the singlestore-spark-connector skips the transaction layer and reads directly from partitions on the leaf nodes. Hence, each individual read task sees an independent version of the database's distributed state. If some queries (other than read operation) are run on the database, they may affect the current read operation. Make sure to take this into account when using readFromLeaves feature.

This feature supports only those query-shapes that do not perform any operation on the aggregator and can be pushed down to the leaf nodes.

In order to use readFromLeaves feature, the username and password provided to the singlestore-spark-connector must be the same across all nodes in the cluster.

By default, the number of partitions in the resulting DataFrame is equal to the number of partitions in the SingleStoreDB database. Number of partitions in the resulting DataFrame can be controlled using parallelRead.maxNumPartitions and parallelRead.numPartitions options.

Requirements:

  • Either the database option is set, or the database name is specified in the load option
  • The username and password provided to the singlestore-spark-connector must be uniform across all the nodes in the cluster, because parallel reads require consistent authentication and connectible leaf nodes
  • The hostnames and ports listed by SHOW LEAVES must be directly connectible from Spark
  • The generated query can be pushed down to the leaf nodes

Parallel read repartitioning

You can repartition the result using parallelRead.repartition option for the readFromAggregators and readFromAggregatorsMaterialized features to ensure that each task reads approximately the same amount of data. This option is very useful for queries with top level limit clauses as without repartitioning it is possible that all rows will belong to one partition.

Use the parallelRead.repartition.columns option to specify a comma separated list of columns that will be used for repartitioning. Column names that contain leading or trailing whitespaces or commas must be escaped as:

  • Column name must be enclosed in backticks
"a" -> "`a`"
  • Each backtick (`) in the column name must be replaced with two backticks (``)
"a`a``" -> "a``a````"

By default, repartitioning is done using an additional column with RAND() value.

Example

spark.read.format("singlestore")
.option("enableParallelRead", "automatic")
.option("parallelRead.Features", "readFromAggregators,readFromLeaves")
.option("parallelRead.repartition", "true")
.option("parallelRead.repartition.columns", "a, b")
.option("parallelRead.TableCreationTimeout", "1000")
.load("db.table")

In the following example, connector will check requirements for readFromAggregators. If they are satisfied, it will use this feature. Otherwise, it will check requirements for readFromLeaves. If they are satisfied, connector will use this feature. Otherwise, it will use non-parallel read. If the connector uses readFromAggregators, it will repartition the result on the SingleStoreDB side before reading it, and it will fail if creation of the result table will take longer than 1000 milliseconds.

Running SQL queries

The methods executeSinglestoreQuery(query: String, variables: Any*) and executeSinglestoreQueryDB(db: String, query: String, variables: Any*) allow you to run SQL queries on a SingleStoreDB database directly using the existing SparkSession object. The method executeSinglestoreQuery uses the database defined in the SparkContext object you use. executeSinglestoreQueryDB allows you to specify the database that will be used for querying. The following examples demonstrate their usage (assuming you already have initialized SparkSession object named spark). The methods return Iterator[org.apache.spark.sql.Row] object.

// this imports the implicit class QueryMethods which adds the methods
// executeSinglestoreQuery and executeSinglestoreQueryDB to SparkSession class
import com.singlestore.spark.SQLHelper.QueryMethods

// You can pass an empty database to executeSinglestoreQueryDB to connect to SingleStoreDB without specifying a database.
// This allows you to create a database which is defined in the SparkSession config for example.
spark.executeSinglestoreQueryDB("", "CREATE DATABASE foo")
// the next query can be used if the database field has been specified in spark object
s = spark.executeSinglestoreQuery("CREATE TABLE user(id INT, name VARCHAR(30), status BOOLEAN)")

// you can create another database
spark.executeSinglestoreQuery("CREATE DATABASE bar")
// the database specified as the first argument will override the database set in the SparkSession object
s = spark.executeSinglestoreQueryDB("bar", "CREATE TABLE user(id INT, name VARCHAR(30), status BOOLEAN)")

You can pass query parameters to the functions as arguments following query. The supported types for parameters are String, Int, Long, Short, Float, Double, Boolean, Byte, java.sql.Date, java.sql.Timestamp.

import com.singlestore.spark.SQLHelper.QueryMethods

var userRows = spark.executeSinglestoreQuery("SELECT id, name FROM USER WHERE id > ? AND status = ? AND name LIKE ?", 10, true, "%at%")

for (row <- userRows) {
  println(row.getInt(0), row.getString(1))
}

Alternatively, these functions can take SparkSession object as the first argument, as in the example below

import com.singlestore.spark.SQLHelper.{executeSinglestoreQuery, executeSinglestoreQueryDB}

executeSinglestoreQuery(spark, "CREATE DATABASE foo")
var s = executeSinglestoreQueryDB(spark, "foo", "SHOW TABLES")

Security

SQL Permissions

The permission matrix describes the permissions required to run each command.

To perform any SQL operation through the SingleStore Spark Connector, you must have the permissions required for that specific operation. The following matrix describes the minimum permissions required to perform some operations.

Note: The ALL PRIVILEGES permission allows you to perform any operation.

Operation Min. Permission Alternative Permission
READ from collection SELECT ALL PRIVILEGES
WRITE to collection SELECT, INSERT ALL PRIVILEGES
DROP database or collection SELECT, INSERT, DROP ALL PRIVILEGES
CREATE database or collection SELECT, INSERT, CREATE ALL PRIVILEGES

For more information on granting privileges, see GRANT

Connecting with a Kerberos-authenticated User

You can use the SingleStoreDB Spark Connector with a Kerberized user without any additional configuration. To use a Kerberized user, you need to configure the connector with the given SingleStoreDB database user that is authenticated with Kerberos (via the user option). Please visit our documentation here to learn about how to configure SingleStoreDB users with Kerberos.

Here is an example of configuring the Spark connector globally with a Kerberized SingleStoreDB user called krb_user.

spark = SparkSession.builder()
    .config("spark.datasource.singlestore.user", "krb_user")
    .getOrCreate()

You do not need to provide a password when configuring a Spark Connector user that is Kerberized. The connector driver (SingleStoreDB JDBC driver) will be able to authenticate the Kerberos user from the cache by the provided username. Other than omitting a password with this configuration, using a Kerberized user with the Connector is no different than using a standard user. Note that if you do provide a password, it will be ignored.

SSL Support

The SingleStoreDB Spark Connector uses the SingleStoreDB JDBC Driver under the hood and thus supports SSL configuration out of the box. In order to configure SSL, first ensure that your SingleStoreDB cluster has SSL configured. Documentation on how to set this up can be found here: https://docs.singlestore.com/latest/guides/security/encryption/ssl/

Once you have setup SSL on your server, you can enable SSL via setting the following options:

spark.conf.set("spark.datasource.singlestore.useSSL", "true")
spark.conf.set("spark.datasource.singlestore.serverSslCert", "PATH/TO/CERT")

Note: the serverSslCert option may be server's certificate in DER form, or the server's CA certificate. Can be used in one of 3 forms:

  • serverSslCert=/path/to/cert.pem (full path to certificate)
  • serverSslCert=classpath:relative/cert.pem (relative to current classpath)
  • or as verbatim DER-encoded certificate string ------BEGIN CERTIFICATE-----...

You may also want to set these additional options depending on your SSL configuration:

spark.conf.set("spark.datasource.singlestore.trustServerCertificate", "true")
spark.conf.set("spark.datasource.singlestore.disableSslHostnameVerification", "true")

See The SingleStoreDB JDBC Driver for more information.

JWT authentication

You may authenticate your connection to the SingleStoreDB cluster using the SingleStoreDB Spark connector with a JWT. To use JWT-based authentication, specify the following parameters:

  • credentialType=JWT
  • password=<jwt-token>

Here's a sample configuration that uses JWT-based authentication:

SparkConf conf = new SparkConf();
conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
conf.set("spark.datasource.singlestore.credentialType", "JWT")
conf.set("spark.datasource.singlestore.useSsl", "true")
conf.set("spark.datasource.singlestore.user", "s2user")
conf.set("spark.datasource.singlestore.password", "eyJhbGci.eyJzdWIiOiIxMjM0NTY3.masHf")

note: To authenticate your connection to the SingleStoreDB cluster using the SingleStoreDB Spark connector with a JWT, the SingleStoreDB user must connect via SSL and use a JWT for authentication.

See Create a JWT User for more information.

See Authenticate via JWT for more information.

Filing issues

When filing issues please include as much information as possible as well as any reproduction steps. It's hard for us to reproduce issues if the problem depends on specific data in your SingleStoreDB table for example. Whenever possible please try to construct a minimal reproduction of the problem and include the table definition and table contents in the issue.

If the issue is related to SQL Pushdown (or you aren't sure) make sure to include the TRACE output (from the com.singlestore.spark package) or the full explain of the plan. See the debugging SQL Pushdown section above for more information on how to do this.

Happy querying!

Setting up development environment

  • install Oracle JDK 8 from this url: https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
  • install the community edition of Intellij IDEA from https://www.jetbrains.com/idea/
  • clone the repository https://github.com/memsql/singlestore-spark-connector.git
  • in Intellij IDEA choose Configure->Plugins and install Scala plugin
  • in Intellij IDEA run Import Project and select path to singlestore-spark-connector build.sbt file
  • choose import project from external model and sbt
  • in Project JDK select New...->JDK and choose the path to the installed JDK
  • Finish
  • it will overwrite some files and create build files (which are in .gitignore)
  • you may need to remove the .idea directory for IDEA to load the project properly
  • in Intellij IDEA choose File->Close Project
  • run git checkout . to revert all changes made by Intellij IDEA
  • in Intellij IDEA choose Open and select path to singlestore-spark-connector
  • run Test Spark 3.0 (it should succeed)

Java & Python Examples

Java

Configuration

SparkConf conf = new SparkConf();
conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
conf.set("spark.datasource.singlestore.user", "admin")
conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

Read Data

DataFrame df = spark
  .read()
  .format("singlestore")
  .option("ddlEndpoint", "singlestore-master.cluster.internal")
  .option("user", "admin")
  .load("foo");

Write Data

df.write()
    .format("singlestore")
    .option("loadDataCompression", "LZ4")
    .option("overwriteBehavior", "dropAndCreate")
    .mode(SaveMode.Overwrite)
    .save("foo.bar")

Python

Configuration

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

Read Data

df = spark \
  .read \
  .format("singlestore") \
  .option("ddlEndpoint", "singlestore-master.cluster.internal") \
  .option("user", "admin") \
  .load("foo")

Write Data

df.write \
    .format("singlestore") \
    .option("loadDataCompression", "LZ4") \
    .option("overwriteBehavior", "dropAndCreate") \
    .mode("overwrite") \
    .save("foo.bar")

singlestore-spark-connector's People

Contributors

0x0ece avatar adalbert44 avatar adalbertmemsql avatar allanca avatar blinov-ivan avatar carlsverre avatar dahlke avatar jwbowler avatar pollio avatar ram-copart avatar rmazurenko-ua avatar sshemchuk-ua avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

singlestore-spark-connector's Issues

java.sql.SQLException: Communications link failure when trying to load data

Hello,

I run Spark 2.2.0 on a Databricks cluster with 1 master and 1 slave. My memsql-connector version is 2.0.2.

I setup my conf

val memsql_host = sc.getConf.get("spark.memsql.host")
val memsql_port = sc.getConf.get("spark.memsql.port")
val memsql_user = sc.getConf.get("spark.memsql.user")
val memsql_password = sc.getConf.get("spark.memsql.password")
val memsql_defaultDatabase = sc.getConf.get("spark.memsql.defaultDatabase")

... and try to connect over jdbc

val pc_conn_str = s"jdbc:mysql://$memsql_host:$memsql_port/$memsql_defaultDatabase"
val pc_conn = DriverManager.getConnection(pc_conn_str, memsql_user, memsql_password)
val pc_stmt = pc_conn.createStatement
val pc_query ="SELECT COUNT(1) as part_count FROM INFORMATION_SCHEMA.TABLE_STATISTICS WHERE TABLE_NAME = 'job_matrix';"
val pc_rs = pc_stmt.executeQuery(pc_query);
 
var part_count = 2

while (pc_rs.next())
{
  part_count =  pc_rs.getInt("part_count")
}
pc_stmt.close()

... I succeed and get a result for part_count.

When I try to do the same with the memsql-connector

val tbl_job_matrix = sqlContext
  .read
  .format("com.memsql.spark.connector")
  .options(Map("query" -> ("SELECT COUNT(1) as part_count FROM INFORMATION_SCHEMA.TABLE_STATISTICS WHERE TABLE_NAME = 'job_matrix'"),
				 "database" -> "job_matrix"))
  .load()

... I get java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

Clearly it's not a an access issue - which is what documentation points me to. Can you suggest a fix or something to check? Also is there a way to setup a timeout on the connection since it takes quite some time until I get the failure?

Thank you!

SparkContext has been shutdown

In some weird cases, an exception in one pipeline may cause SparkContext to be shutdown.

(For example, I was able to get into that situation with a pipeline that repeatedly failed with kafka.common.OffsetOutOfRangeException... not really sure why SparkContext has been shutdown, but that happened.)

As a result, all other pipelines enter in a loop where each batch fails with:

java.lang.IllegalStateException: SparkContext has been shutdown

I think it could be useful to catch this exception, and possibly perform a nice restart? Not sure what else one can do if SparkContext has been shutdown...

issue with number of partitions when we use spark.read

Hello Team,
Here is my use case with memsql spark connector READ operation:

  1. I have table in MEMSQL and I am reading this table using below memsql connector read operation
    val DF = spark.read.format("memsql").option("ddlEndpoint", "url value").option("user","username").option("Password", "pwd").load("memsql table name")

  2. Once I read it to spark dataframe, I used below spark write API to write memsql data into hive table.
    DF.write.mode("overwrite").format("orc").saveAsTable("hive tabl ename")

  3. In this case, in my cluster, I see only one executor and one task running actively even though I specified 20 cores and 120 executers.

Actual issue: is, spark.read is always creating DF as single partition and because of that, this job not able to use available cores and executers.

I tried DF.repartitions(50) and then applied write operation, but no luck..

How can we make memsql spark.read() API create multiple partitions of DF, so that our job will use cluster resources effectively ?

because of this issue, for 100mn records, it took 28 hours to read from memsql table and write it to HIVE external table.

Any solution ?

TypeConversions doesn't honor unsigned smallint and need better error handling

One of our column in MemSQL database is defined as
start_video_pos smallint(5) unsigned DEFAULT NULL
TypeConversions treats unsigned smallint as spark's ShortType which causes an exception while reading data event though value is in the unsigned smallint range. We suspect it needs to take into account sign bit and use IntergerType to fit value

We use memsql-connector_2.11:2.0.5 and are getting reading error with following trace:

WARN TaskSetManager: Lost task 11.0 in stage 0.0 (TID 11, ......compute.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLDataException: Value '65535' is outside of valid range for type java.lang.Short
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:85)
at com.mysql.cj.jdbc.result.ResultSetImpl.getShort(ResultSetImpl.java:863)
at org.apache.commons.dbcp2.DelegatingResultSet.getShort(DelegatingResultSet.java:211)
at org.apache.commons.dbcp2.DelegatingResultSet.getShort(DelegatingResultSet.java:211)
at com.memsql.spark.connector.dataframe.TypeConversions$.GetJDBCValue(TypeConversions.scala:80)
at com.memsql.spark.connector.util.JDBCImplicits$ResultSetHelpers$$anonfun$toRow$1.apply(JDBCImplicits.scala:26)
at com.memsql.spark.connector.util.JDBCImplicits$ResultSetHelpers$$anonfun$toRow$1.apply(JDBCImplicits.scala:23)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at com.memsql.spark.connector.util.JDBCImplicits$ResultSetHelpers.toRow(JDBCImplicits.scala:23)
at com.memsql.spark.connector.MemSQLQueryRelation$$anonfun$1.apply(MemSQLRelation.scala:45)
at com.memsql.spark.connector.MemSQLQueryRelation$$anonfun$1.apply(MemSQLRelation.scala:45)
at com.memsql.spark.connector.rdd.MemSQLRDD$$anon$1.getNext(MemSQLRDD.scala:275)
at com.memsql.spark.connector.util.NextIterator.hasNext(NextIterator.scala:74)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:380)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Caused by: com.mysql.cj.core.exceptions.NumberOutOfRange: Value '65535' is outside of valid range for type java.lang.Short
at com.mysql.cj.core.io.IntegerBoundsEnforcer.createFromLong(IntegerBoundsEnforcer.java:50)
at com.mysql.cj.core.io.BaseDecoratingValueFactory.createFromLong(BaseDecoratingValueFactory.java:58)
at com.mysql.cj.core.io.MysqlTextValueDecoder.decodeUInt2(MysqlTextValueDecoder.java:189)
at com.mysql.cj.mysqla.result.AbstractResultsetRow.decodeAndCreateReturnValue(AbstractResultsetRow.java:84)
at com.mysql.cj.mysqla.result.AbstractResultsetRow.getValueFromBytes(AbstractResultsetRow.java:225)
at com.mysql.cj.mysqla.result.ByteArrayRow.getValue(ByteArrayRow.java:84)
at com.mysql.cj.jdbc.result.ResultSetImpl.getNonStringValueFromRow(ResultSetImpl.java:630)
... 34 more

It would be useful to add more logging in JDBCImplicits.toRow() to log which column causes the error instead of just propagating Mysql error

Failure for writing large dataframe to memsql from spark

I am using this memsql-spark connector for writing dataframe from spark to memsql. This library works fine when the size of dataframe is not large. From number of lines aspect, if the dataframe has rows smaller than several millions, then it works. However, when the dataframe has more than 10 million rows, then the program may fail, if it has rows as large as billions, then it will definitely fails.

I am using spark and memsql on a single large server, so memsql master node and spark driver node are same.

The main error in the log is : java.sql.SQLException: Unable to connect to leaf @10.1.10.40:3307 with user root, using password NO: [2005] Timed out trying to connect to '10.1.10.40':3307 after 30 seconds.

This issue bothers me long time, I have updated memsql and this connect library, yet still not solved the problem.

selective pushdown

Hi,

Is there a way we can do a selective pushdown ?
For example, can we specify the list of potential query types that can be considered for pushdown logic ?
in some cases we want to pushdown only filtering and do the joins , or rank functions on spark side.

Thanks,
Aleks

saveToMemSQL Error when run sql with GROUP BY

It's ok with

val sql =
    s"""
      | SELECT gender AS genderId, -1 AS ageId
      | FROM $tableName
    """.stripMargin

But when I add more field to calculate and group by.

val sql =
    s"""
      | SELECT gender AS genderId, -1 AS ageId,
      | CAST(0 AS LONG) AS impression, CAST(0 AS LONG) AS trueImpression, COUNT(*) AS click
      | FROM $tableName
      | GROUP BY gender
    """.stripMargin
  val result = sqlContext.sql(sql)
  result.saveToMemSQL("dev_output", "aggregate")

It throws error

INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
Exception in thread "main" com.memsql.spark.SaveToMemSQLException: SaveToMemSQLException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 172 in stage 1.0 failed 1 times, most recent failure: Lost task 172.0 in stage 1.0 (TID 174, localhost): org.apache.spark.SparkException: Internal error: release called on 33554432 bytes but task only has 0
    at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
    at org.apache.spark.unsafe.map.BytesToBytesMap.free(BytesToBytesMap.java:708)
    at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.free(UnsafeFixedWidthAggregationMap.java:234)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:678)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:76)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.memsql.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:53)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:85)
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:40)
    at job.WriteToMemSQL$delayedInit$body.apply(WriteToMemSQL.scala:105)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at job.WriteToMemSQL$.main(WriteToMemSQL.scala:17)
    at job.WriteToMemSQL.main(WriteToMemSQL.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

I am using spark 1.5.2 and memsql 1.3.2

GROUP BY is not working when IN is added in WHERE clause

Hi there is a difference when i run queries from console(mysql client) and using spark-memsql-connector
using connector it is not grouping data.
Query:

SELECT
  dimension,
  feature_id,
  sub_pipeline,
  sum(click_in) AS click_in,
  sum(click_out) AS click_out,
  sum(income) AS income
FROM gravity_test.counter_daily cs
WHERE sub_pipeline in ('1','2') AND cs.c_date >= '2007-03-03' AND cs.c_date < '2017-03-03'
GROUP BY dimension, sub_pipeline, feature_id;

is returning:
+---------------+----------+------------+--------+---------+------+
|dimension |feature_id|sub_pipeline|click_in|click_out|income|
+---------------+----------+------------+--------+---------+------+
|KW_TEXT |a |2 |10 |5 |10 |
|KW_TEXT |b |1 |30 |20 |30 |
|COUNTRY |GB |1 |10 |5 |10 |
|COUNTRY#KW_TEXT|US#b |1 |10 |5 |10 |
|COUNTRY |US |1 |10 |5 |10 |
|KW_TEXT |c |1 |15 |15 |20 |
|COUNTRY#KW_TEXT|US#a |1 |10 |5 |10 |
|KW_TEXT |a |1 |10 |5 |10 |
|KW_TEXT |d |1 |30 |20 |30 |
|KW_TEXT |c |1 |5 |15 |25 |
|COUNTRY#KW_TEXT|GB#a |1 |10 |5 |10 |
|KW_TEXT |c |2 |10 |5 |10 |
|KW_TEXT |c |1 |10 |5 |10 |
|KW_TEXT |a |1 |20 |10 |20 |
+---------------+----------+------------+--------+---------+------+
when i am using console it behaves correctly.

Implementing the same query using WHERE sub_pipeline_id = '1' works well in memsql-connector

SELECT
  dimension,
  feature_id,
  sub_pipeline,
  sum(click_in) AS click_in,
  sum(click_out) AS click_out,
  sum(income) AS income
 FROM gravity_test.counter_daily cs
 WHERE sub_pipeline = '1' AND cs.c_date >= '2007-03-03' AND cs.c_date < '2017-03-03'
 GROUP BY dimension, sub_pipeline, feature_id

for the schema:

DROP DATABASE IF EXISTS gravity_test;
CREATE DATABASE gravity_test;
DROP TABLE IF EXISTS counter_daily;
CREATE TABLE `counter_daily` (
  `sub_pipeline` bigint(20) NOT NULL,
  `dimension` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `feature_id` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `c_date` datetime NOT NULL,
  `click_in` bigint(20) NOT NULL DEFAULT 0,
  `click_out` bigint(20) NOT NULL DEFAULT 0,
  `income` decimal(10,0) NOT NULL DEFAULT 0,
  `income_count` bigint(20) NOT NULL DEFAULT 0,
  PRIMARY KEY (`sub_pipeline`,`dimension`,`feature_id`,`c_date`)
);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY', 'GB', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY', 'US', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY#KW_TEXT', 'GB#a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY#KW_TEXT', 'US#a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'COUNTRY#KW_TEXT', 'US#b', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'a', '2016-01-02 00:00:00.000000', 20, 10, 20, 2);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'b', '2016-01-01 00:00:00.000000', 30, 20, 30, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'c', '2017-01-01 00:00:00.000000', 5, 15, 25, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'c', '2017-01-11 00:00:00.000000', 15, 15, 20, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'c', '2017-03-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (1, 'KW_TEXT', 'd', '2017-03-01 00:00:00.000000', 30, 20, 30, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (2, 'KW_TEXT', 'a', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (2, 'KW_TEXT', 'c', '2017-03-01 00:00:00.000000', 10, 5, 10, 1);
INSERT INTO gravity_test.counter_daily (sub_pipeline, dimension, feature_id, c_date, click_in, click_out, income, income_count) VALUES (3, 'KW_TEXT', 'b', '2016-01-01 00:00:00.000000', 10, 5, 10, 1);

PipelineMonitor graceful stop

Hi,

With the new support for checkpoints it would be nice to have:

  1. a graceful stop of the pipelines
  2. a graceful stop/restart of the interface
    (In short to avoid duplicated messages when restarting.)

Step 2. seems more complex, it should take into account timeouts (if a pipeline doesn't stop) and personally I'd also need some sort of priority to stop/restart the pipelines in a particular order. So, I'm planning to manage that with an external controller.

On step 1, instead, I was looking at the code and it seems relatively doable.
Just to be clear, with "graceful stop" I mean that if the step started, then it has to complete.

If I'm not wrong, this is the cause of the immediate stop:
https://github.com/memsql/memsql-spark-connector/blob/master/interface/src/main/scala/com/memsql/spark/interface/PipelineMonitor.scala#L639
and in principle PipelineMonitor.stop() could just set isStopping=true, while cancelling the job and interrupting the thread could be moved after the step terminates here:
https://github.com/memsql/memsql-spark-connector/blob/master/interface/src/main/scala/com/memsql/spark/interface/PipelineMonitor.scala#L179

Ritual questions:

  • Am I missing something?
  • What do you think? Shippit? :)

Thanks, E.

Single RDD partition

although I'm configuring database name when loading I'm keeping getting single RDD partition
ss.read.format("com.memsql.spark.connector").options(Map("path" -> ("table"),"database" -> ("metrics"))).load()

db is configured with 32 partitions:
memsql> show partitions on metrics; +---------+----------------+------+--------+--------+ | Ordinal | Host | Port | Role | Locked | +---------+----------------+------+--------+--------+ | 0 | 10.125.134.164 | 3307 | Master | 0 | | 1 | 10.125.134.164 | 3307 | Master | 0 | | 2 | 10.125.134.164 | 3307 | Master | 0 | | 3 | 10.125.134.164 | 3307 | Master | 0 | | 4 | 10.125.134.164 | 3307 | Master | 0 | | 5 | 10.125.134.164 | 3307 | Master | 0 | | 6 | 10.125.134.164 | 3307 | Master | 0 | | 7 | 10.125.134.164 | 3307 | Master | 0 | | 8 | 10.125.134.164 | 3307 | Master | 0 | | 9 | 10.125.134.164 | 3307 | Master | 0 | | 10 | 10.125.134.164 | 3307 | Master | 0 | | 11 | 10.125.134.164 | 3307 | Master | 0 | | 12 | 10.125.134.164 | 3307 | Master | 0 | | 13 | 10.125.134.164 | 3307 | Master | 0 | | 14 | 10.125.134.164 | 3307 | Master | 0 | | 15 | 10.125.134.164 | 3307 | Master | 0 | | 16 | 10.125.134.164 | 3307 | Master | 0 | | 17 | 10.125.134.164 | 3307 | Master | 0 | | 18 | 10.125.134.164 | 3307 | Master | 0 | | 19 | 10.125.134.164 | 3307 | Master | 0 | | 20 | 10.125.134.164 | 3307 | Master | 0 | | 21 | 10.125.134.164 | 3307 | Master | 0 | | 22 | 10.125.134.164 | 3307 | Master | 0 | | 23 | 10.125.134.164 | 3307 | Master | 0 | | 24 | 10.125.134.164 | 3307 | Master | 0 | | 25 | 10.125.134.164 | 3307 | Master | 0 | | 26 | 10.125.134.164 | 3307 | Master | 0 | | 27 | 10.125.134.164 | 3307 | Master | 0 | | 28 | 10.125.134.164 | 3307 | Master | 0 | | 29 | 10.125.134.164 | 3307 | Master | 0 | | 30 | 10.125.134.164 | 3307 | Master | 0 | | 31 | 10.125.134.164 | 3307 | Master | 0 | +---------+----------------+------+--------+--------+ 32 rows in set (0.00 sec)

NullPointerException while getting next window partition

Seen in both Spark 2.0.0 and Spark 2.1.0
Stack trace for 2.1.0:

java.lang.NullPointerException at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:341)
    at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391)
    at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.memsql.spark.connector.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:52)
    at java.lang.Thread.run(Thread.java:745)

Using the memsql-spark-connector, Spark is unable to fetch the next partition because of a NullPointerException for this line in WindowExec: TaskContext.get().taskMemoryManager(),. I can output this full dataset by writing a CSV file to S3 or HDFS, so it seems like it must be something with the memsql-spark-connector that is causing that error.

Data input is a single large (208 GB) parquet file with the following Spark SQL query modifying the data to be output:

   SELECT lessonSessionId AS id,
                activityId as activity_id,
                lessonId AS activity_item_id,
                componentId AS activity_component_id,
                studentId AS student_id,
                studentAcademicYear AS student_academic_year_id,
                academicYearId AS academic_year,
                subjectId AS subject,
                to_date(eventStart) AS start_date_id,
                (unix_timestamp(eventStart) - unix_timestamp(to_date(eventStart))) AS start_time_id,
                to_date(eventEnd) AS end_date_id,
                (unix_timestamp(eventEnd) - unix_timestamp(to_date(eventEnd))) AS end_time_id,
                durationSecs AS duration_seconds,
                durationSecsRaw AS duration_seconds_raw,
                loginSessionId AS session_id,
                sittingType AS sitting_type
            FROM (
                SELECT *, ROW_NUMBER() OVER (PARTITION BY lessonSessionId ORDER BY eventEnd DESC) col
                FROM prod_tot_prod_tot
                WHERE eventType != 'LESSON_SKIP'
            ) x
            WHERE x.col = 1

And this code outputting the data to MemSQL:

val conf = new SaveToMemSQLConf(SaveMode.Ignore, CreateMode.Skip, Option.empty, 10000, CompressionType.GZip, true, Seq.empty, Seq.empty, false)
sqlDf.saveToMemSQL(TableIdentifier.apply(db, table), conf)

About half of the data is loaded into MemSQL, but some of the tasks persistently fail with the above NPE.

Getting Mysql "Multiple statements detected in a single query." error.

Seems mysql driver is in classpath of provided spark sql package on our cluster. this one gets priority and the error below is a result. Technically not a bug in this connector, but not sure how to best workaround it.

2020-02-28 18:39:55.460 ERROR [Driver] util.SimpleWorkHourExporter$: Failed to run  job:
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Multiple statements detected in a single query. SET SESSION collation_server=utf8_general_ci;sql_select_limit=18446744073709551615;compile_only=false;sql_mode='STRICT_ALL_TABLES,ONLY_FULL_GROUP_BY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
	at com.mysql.jdbc.Util.getInstance(Util.java:387)
	at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)
	at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)
	at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)
	at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)
	at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2547)
	at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1541)
	at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2605)
	at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1469)
	at com.mysql.jdbc.ConnectionImpl.setSessionVariables(ConnectionImpl.java:5092)
	at com.mysql.jdbc.ConnectionImpl.initializePropsFromServer(ConnectionImpl.java:3262)
	at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2299)
	at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2085)
	at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:795)
	at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:44)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
	at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:400)
	at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:327)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
	at com.memsql.spark.JdbcHelpers$.prepareTableForWrite(JdbcHelpers.scala:189)
	at com.memsql.spark.DefaultSource.createRelation(DefaultSource.scala:64)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
	at com.hulu.bda.memsql.util.HiveMemsqlUtil$.saveToTable(HiveMemsqlUtil.scala:79)

Does not work with Spark 2.0

Hi,
I was using the connector with Spark 1.6.2 and it was working fine, once we upgraded it to Spark 2.0, it creates the table but doesn't write anything.

control max number of concurrent connections on save

Hello ,

is there a way we can control max number of concurrent connection from job side on save action?

the use case is following:

the computation that we are doing is pretty intensive, but the number of output rows is not so big.
now we are able to increase the parallelism in spark cluster increasing minNumPostShufflePartitions.

but on the other side I see a lot of pending connection to memsql now:

LOAD DATA LOCAL INFILE '###.lz4' INTO TABLE....
...

now, in theory we can create a user queue in memsql cluster, but i was trying to see if there is way to control this from connector side.

i tried increasing insertBatchSize to some big number but that didn't help.

thanks,
Aleks

ALTER TABLE xxx RENAME TO issue with memsql spark connector

Hello Team,
We are trying to rename a table present in memsql database using memsql spark connector as shown below.
But we are not successful. Can you please verify below memsql spark connector code and advise ?

Code:

sparksession
.read
.format("memsql")
.option("ddlEndpoint", connectionURL)
.option("user",username)
.option("Password", password)
.options(Map("query" -> ("ALTER TABLE Table1 RENAME TO Table2"),"database" -> "MyDB"))
.load()

ERROR:

You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'alter table MyDB.Table1 rename to MyDb.Table2) AS q WHERE 1=0' at line 1

load data to memsql table from Hadoop (HDFS) ORC files

Hello Team,
I have a external HIVE table with billions of records in Hive and the underlying files are ORC format for this table. Is there a way I can use any memsql pipeline concept or any other memsql approach I can read data from HDFS ORC files and load to memsql table ?

we have memsql and hadoop on two different machine in the same network.

We tried using memsql-spark connector, but data loads very slow. Trying other options to load the data quickly.

Exception caught when connecting to the memsql cluster

Hi,

first of all, I'd like to say this project is really interesting. But, unfortunately I haven't found a consolidated documentation across the Internet related with the project and other examples or reported use cases.

I'm facing with I believe be trivial, that's basically trying to connect to the memsql cluster I had set up and then load the data I want to my spark app. Basically I just asked at stack overflow about this scenario, and I found interesting share that link here as an opportunity to us discuss where exactly could be the potencial point of error. So the link of my question and scenario of error is at http://stackoverflow.com/questions/34972034/error-using-memsql-spark-connector

I'd appreciate hear from the ppl who is actively coding/participating of the project.
Thank you.

MemSQLConnectionPool connect(info: MemSQLConnectionInfo) always invokes a connection

In testing I see the connect() always invokes a connection since it is not properly checking on the key but does not add it to the pool since there is a check pools.putIfAbsent()

In MemSQLConnectionPool.scala
def connect(info: MemSQLConnectionInfo): Connection = {
if (!pools.contains(info)) {

should be changed to

def connect(info: MemSQLConnectionInfo): Connection = {
if (!pools.containsKey(info)) {
Let me know if you want me to submit a PR, thanks.

avro Date format exception

When I switch to loadDataFormat=Avro

I get an error when I have DataFrame with Date column

org.apache.avro.AvroRuntimeException: Unknown datum type java.sql.Date:

thanks,
Aleks

While Executing the spark job on EMR running into No Suitable Driver found issue

Below is my code snippet:

import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object SSPInventoryLoaderJob {

val logger = LoggerFactory.getLogger(getClass)

def main(args: Array[String]) {
// create Spark context with Spark configuration
val sc = new SparkContext(new SparkConf().setAppName("Spark Count").setMaster("local[*]"))
val sql = new SQLContext(sc)
val host = "xx.xx.xx.xx"
val port = 3306
val dbName = "xxxx"
val user = "xxxx"
val password = "xxxx"
val tableName = "xxx"

val conf = ConfigFactory.load
sql.sparkContext.hadoopConfiguration.set(Constants.S3_ACCESSKEY, conf.getString(Constants.SPARK_ETL_S3_ACCESSKEY))
sql.sparkContext.hadoopConfiguration.set(Constants.S3_SECRETKEY, conf.getString(Constants.SPARK_ETL_S3_SECRETKEY))
val start: Long = System.currentTimeMillis
val dfSSPInventoryList: DataFrame = sql.read
  .format(conf.getString(Constants.REDSHIFT_DRIVER))
  .option(Constants.URL, conf.getString(Constants.JDBC_URL))
  .option(Constants.QUERY, conf.getString(Constants.SSP_INVENTORY_LOADER_JOB_QUERY))
  .option(Constants.TEMP_DIR, conf.getString(Constants.S3_TMP_DIRECTORY) + "/ssp_inventory/")
  .load()
  val func_rt_acc = new com.memsql.spark.connector.DataFrameFunctions(dfSSPInventoryList)
func_rt_acc.saveToMemSQL(dbName, tableName, host, port, user, password)

}
}

Error:
Exception in thread "main" java.sql.SQLException: No suitable driver found for jdbc:mysql://xx.xx.xx.xxxx:3306
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at com.memsql.spark.context.MemSQLContext$.getMemSQLConnection(MemSQLContext.scala:41)
at com.memsql.spark.context.MemSQLContext$.getMemSQLChildAggregators(MemSQLContext.scala:49)
at com.memsql.spark.context.MemSQLContext$.getMemSQLNodesAvailableForIngest(MemSQLContext.scala:101)
at com.memsql.spark.connector.RDDFunctions.saveToMemSQL(RDDFunctions.scala:86)
at com.memsql.spark.connector.DataFrameFunctions.saveToMemSQL(DataFrameFunctions.scala:65)
at com.cadreon.atv.ranker.SSPInventoryLoaderJob$.main(SSPInventoryLoaderJob.scala:39)
at com.cadreon.atv.ranker.SSPInventoryLoaderJob.main(SSPInventoryLoaderJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

insert strategy generate bad sql query

I trying to so some sort of upsert and thus i'm using the insert strategy and not load strategy.

when debugging the code I saw that the query generated from "build query" method
(memsql-spark-connector/connectorLib/src/main/scala/com/memsql/spark/connector/sql/InsertQuery.scala)

looks like:

INSERT INTO table (user_level,user_id,dwh_update_ts) VALUES (?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE user_id = values(user_id) , user_level = values(user_level) , dwh_update_ts = values(dwh_update_ts)

thus doesn't insert/update anything on my table

my code is as follows(scala):

def partialWriteToTarget(df:DataFrame,fieldsToUpdate:List[String],KeyField:String):Unit ={
val saveToMemSQLConf = SaveToMemSQLConf.apply(memsqlConf=this.memSQLConf.get,params=Map("onDuplicateKeySQL"-> createUpdateStatement(fieldsToUpdate)))
val tableIdentifier = TableIdentifier("bi_user_state",table_name)
df.saveToMemSQL(tableIdentifier,saveToMemSQLConf)

}
def createUpdateStatement(fields:List[String]):String={
fields.map(x=>s" $x = values($x) ,").mkString.dropRight(2)
}

New Spark version >= 2.0.0 doesn't have dataframes, these are replaced with dataset. So df.saveToMemSQL Not working.

New Spark version >= 2.0.0 doesn't have dataframes, these are replaced with dataset.
So df.saveToMemSQL Not working.

Exception in thread "main" java.lang.NoSuchMethodError: com.memsql.spark.connector.package$.DataFrameFunctions(Lorg/apache/spark/sql/Dataset;)Lcom/memsql/spark/connector/package$DataFrameFunctions;
at com.olx.guru.KafkaWordCount$.main(KafkaWordCount.scala:130)
at com.olx.guru.KafkaWordCount.main(KafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

zero date time behaviour connection property

Getting this error while saving a dataframe with df.write

The connection property 'zeroDateTimeBehavior' acceptable values are: 'CONVERT_TO_NULL', 'EXCEPTION' or 'ROUND'. The value 'convertToNull' is not acceptable

MemSQL Interface on CDH Spark over Yarn

Hi,

I'm trying to run memsql-spark-interface-1.3.0 on a CDH 5.1 Spark cluster, in yarn (client) mode.

For context, I can run spark-shell, and I can include memsql interface jar and play with it. That works fine.

spark-shell --master yarn --jars memsql-spark-interface-1.3.0.jar

If I try to run the application, though, I get this error:

spark-submit --master yarn memsql-spark-interface-1.3.0.jar --port 10009 --dataDir /tmp/memsql/ --dbHost $MASTER_IP

16/03/07 21:49:07 INFO Submitted application application_1456615046564_0024
16/03/07 21:49:15 INFO Connected to a Spark master on yarn-client
Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-4] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark-interface]
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[ERROR] [03/07/2016 21:49:15.509] [spark-interface-akka.actor.default-dispatcher-4] [ActorSystem(spark-interface)] Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-4] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/07/2016 21:49:15.557] [spark-interface-akka.actor.default-dispatcher-3] [akka://spark-interface/user/IO-HTTP] head of empty list
java.util.NoSuchElementException: head of empty list
    at scala.collection.immutable.Nil$.head(List.scala:337)
    at scala.collection.immutable.Nil$.head(List.scala:334)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/07/2016 21:49:15.561] [spark-interface-akka.actor.default-dispatcher-3] [akka://spark-interface/user/IO-HTTP] changing Recreate into Create after java.util.NoSuchElementException: head of empty list
Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark-interface]
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:135)
    at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:129)
    at akka.actor.ActorCell.faultCreate(ActorCell.scala:338)
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:58)
[ERROR] [03/07/2016 21:49:15.564] [spark-interface-akka.actor.default-dispatcher-3] [ActorSystem(spark-interface)] Uncaught error from thread [spark-interface-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError
    at akka.actor.ActorLogging$class.$init$(Actor.scala:335)
    at spray.can.HttpManager.<init>(HttpManager.scala:29)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at spray.can.HttpExt$$anonfun$1.apply(Http.scala:153)
    at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
    at akka.actor.Props.newActor(Props.scala:339)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:135)
    at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:129)
    at akka.actor.ActorCell.faultCreate(ActorCell.scala:338)
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:58)
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:428)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:338)

    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:428)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have the feeling this is due to CDH, but I wanted to double check if memsql interface can work in yarn mode.

Googling around, I've found this similar issue, in a totally unrelated project:
FRosner/spawncamping-dds#47

Thanks, E.

cant add JSON field from within DataFrame.withColumn call

Hello

I have a Map field inside DataFrame which i need to save as JSON to MemSQL. When i do conversion to JsonValue, i got following error

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.spark.unsafe.types.UTF8String

Here is code example which reproduces the problem:

https://gist.github.com/akonopko/495f2ece5398700e000ec63dafb61c2b

I was able to fix this with a workaround:

https://gist.github.com/akonopko/af48c91170fa8f344bcb63aa31b82037

[Help appreciated] Issue when writing to memsql

Hi,

I am trying to use memsql-spark-connector to write spark dataframe to memsql. However it gave me this error. I used group by to generate the Spark dataframe. Can anyone help? Thanks

java.lang.NullPointerException
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.createHashMap(HashAggregateExec.scala:311)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at com.memsql.spark.connector.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:52)
at java.lang.Thread.run(Thread.java:745)

registerTable problem

I have a MemSQL cluster. I tried connect to it and read some data from a Table but when I select my table the program throw this exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: Clienti;
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:306)

I tried different ways to do this:

object LoadTestSpark extends App {
  val conf = new SparkConf().setAppName("MemSQL Example")
  conf.set("memsql.host", "192.168.3.12")
  conf.setMaster("local[1]")
  val sparkContext = new SparkContext(conf)
  val memsqlContext = new MemSQLContext(sparkContext)
  memsqlContext.setDatabase("MyDB")
memsqlContext.sql("select * from Client").registerTempTable("client")
}
object LoadTestSpark extends App {
  val conf = new SparkConf().setAppName("MemSQL Example")
  conf.set("memsql.host", "192.168.3.12")
  conf.setMaster("local[1]")
  val sparkContext = new SparkContext(conf)
  val memsqlContext = new MemSQLContext(sparkContext)
  memsqlContext.setDatabase("MyDB")
  val client = memsqlContext.table("Client")
}

but in both cases it does not work.

If i run this:

...
memsqlContext.tableNames().foreach(println)
...

the output is:

...
`MyDB`.`Checkup_dati`
`MyDB`.`Chiamate_blocchi`
`MyDB`.`Cicli`
`MyDB`.`Classificazioni`
`MyDB`.`Client`
....

what can I do for load data from my tables?

SingleStore DB support Array<struct> column types ?

Hello Team,
Like hive, does MEMSQL support table column datatype array as show below ?
if yes, how to extract array inner columns ? explode lateral view we have in hive, anything similar ?

column name | Datatype

diagnoses | array<struct<column1 string, column2 double>>

V3 is too slow as compared to V2

we were using Spark connector V2 from quite some time and had it benchmarked, we recently tried using V3 connector and surprisingly its 2X-3X slower than V2:

V2 :

<dependency>
    <groupId>com.memsql</groupId>
    <artifactId>memsql-connector_2.11</artifactId>
    <version>2.0.6</version>
</dependency>

V3:

<dependency>
    <groupId>com.memsql</groupId>
    <artifactId>memsql-spark-connector_2.11</artifactId>
    <version>3.0.4-spark-2.3.4</version>
</dependency>

Some stats:
Data Ingestion Size 300GB / 2.2 B records

RDD Partitions Memsql Connector V2 Total Ingestion Time in minutes Memsql Connector V3 Total ingestion time in mins DB Partitions Comment
32 135 190 total 32  
32 65 175  total 32

Spark config used was same for both connectors.

spark.speculation=false
spark.driver.memory=30G
spark.executor.memory=25G
spark.executor.instances=32
spark.executor.cores=8
spark.yarn.maxAppAttempts=1
spark.memory.storageFraction=0.2
spark.memory.fraction=0.85

load balancing for data ingestion is not working with multi aggregators

even after setting right config as per doc, the ingestion load from spark connector is only going to one aggregator only from spark connector.

config set:
spark.conf.set("spark.datasource.memsql.ddlEndpoint", "memsql-master.cluster.internal")
spark.conf.set("spark.datasource.memsql.dmlEndpoints", "memsql-master.cluster.internal,memsql-child-1.cluster.internal:3307")

ran job multiple times and all the load infile commands are going to same aggregator.

Question on merge functionality

Hello,
In memsql-spark-connector, I see there is merge option which will happen based on primary key (PK) by using overwriteBehavior=merge option.

In my case, I do not have any primary key for my memsql table. Is there any other way we can do merge by using memsql-spark-connector ?

I am looking for below two functionalities, when I say merge:

  1. If record exists, I want to overwrite
  2. If record doesn't exists, then I want insert as new record.

Based on memsql documentation, I couldn't find any help regarding this question for merge without PK.

memsql connector supports keytab and principal for kerb authn

Hi, I'm not sure if this is the right place to raise a feature request. I was wondering if we can add support for keytab and principal fields similar to spark jdbc reader which would load Krb5ConnectorContext with keytab and principal configuration. I can supply a custom jaas.conf in spark but would be nice if this can be supported as a spark option.

SaveMode configuration not honoured while saving data using DataFrameReader API

All the following code appends the data to the table. It doesn't seem to honour the save mode specified

dataframe.write.format("com.memsql.spark.connector").mode(SaveMode.ErrorIfExists).save("test.tablename")
dataframe.write.format("com.memsql.spark.connector").mode(SaveMode.Overwrite).save("test.tablename")
dataframe.write.format("com.memsql.spark.connector").mode(SaveMode.Append).save("test.tablename")

Add support for Scala 2.11.7

First, thanks for the great project, just what I what I needed for a given use case.

Can the build.sbt be modify to add crossScalaVersions and set the scalaVersion := "2.11.7". I bumped the java version to 1.8 as I wanted to test the deploy locally, but that should not be necessary. Here is the diff from my system, will be glad to put together a PR if you feel this is worth doing.

RADTech-MBP:memsql-spark-connector tnist$ git diff build.sbt
diff --git a/build.sbt b/build.sbt
index 0c2e410..e5b401e 100644
--- a/build.sbt
+++ b/build.sbt
@@ -14,7 +14,8 @@ lazy val testScalastyle = taskKey[Unit]("testScalastyle")
 lazy val commonSettings = Seq(
   organization := "com.memsql",
   version := "1.2.1-SNAPSHOT",
-  scalaVersion := "2.10.5",
+  scalaVersion := "2.11.7",
+  crossScalaVersions := Seq("2.10.4", "2.11.7"),
   assemblyScalastyle := org.scalastyle.sbt.ScalastylePlugin.scalastyle.in(Compile).toTask("").value,
   assembly <<= assembly dependsOn assemblyScalastyle,
   testScalastyle := org.scalastyle.sbt.ScalastylePlugin.scalastyle.in(Test).toTask("").value,
@@ -54,7 +55,7 @@ lazy val commonSettings = Seq(
   publishMavenStyle := true,
   publishArtifact in Test := false,
   pomIncludeRepository := { _ => false },
-  javaVersionPrefix in javaVersionCheck := Some("1.7")
+  javaVersionPrefix in javaVersionCheck := Some("1.8")
 )

 lazy val connectorLib = (project in file("connectorLib")).

-Todd

Spark MemSQL Connector looses connection after idle time

Example:

val df = memsqlContext.read.format("com.memsql.spark.connector").load("db.table")

5 minutes idle time...

val df = memsqlContext.read.format("com.memsql.spark.connector").load("db.table") ...fails

Doing the same using the standard spark jdbc dataframe api, it works.

Kafka topic with many partitions (on Yarn)

Hi, I think I've found an issue reading from Kafka when there are too many partitions, specifically when the number of partitions is greater than spark.port.maxRetries (default 16).

Note that I'm using a CDH Spark cluster in yarn-client mode, so I'm not sure this also repros with MemSQL Spark distro.

In summary, when a container starts it tries to bind to a port starting from an initial port, and fails after spark.port.maxRetries. If the Kafka topic has n partitions, then at least n containers are created, and if n > spark.port.maxRetries the available ports are saturated and eventually containers will fail to start.

I think in these cases it would be better to shut down the pipeline (like a fatal error), such that other pipelines can run without issues.

What happens now is that the exceptions are thrown but "ignored" and data is loaded normally. However I'm not sure if all data is loaded in all cases, as creating a test is kind of laborious... but I'd expect that if you have data in all partitions, some would fail.

Interface logs:

16/04/25 16:24:38 ERROR Lost executor 9 on <hostname.redacted>: Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/04/25 16:24:38 WARN Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/04/25 16:24:38 WARN Lost task 7.0 in stage 0.0 (TID 12, <hostname.redacted>): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1

16/04/25 16:24:38 ERROR Listener SQLListener threw an exception
java.lang.NullPointerException
    at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLListener.scala:167)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
16/04/25 16:24:39 ERROR Lost executor 27 on <hostname.redacted>: Container marked as failed: container_1461362160503_0004_01_000030 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.

Container logs from:
http://...:8042/node/containerlogs/container_1461362160503_0004_01_000010/stderr/stderr/?start=0

16/04/25 16:24:30 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 34377@<hostname.redacted>
16/04/25 16:24:30 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
16/04/25 16:24:32 INFO spark.SecurityManager: Changing view acls to: yarn,emanuele
16/04/25 16:24:32 INFO spark.SecurityManager: Changing modify acls to: yarn,emanuele
16/04/25 16:24:32 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, emanuele); users with modify permissions: Set(yarn, emanuele)
16/04/25 16:24:33 INFO spark.SecurityManager: Changing view acls to: yarn,emanuele
16/04/25 16:24:33 INFO spark.SecurityManager: Changing modify acls to: yarn,emanuele
16/04/25 16:24:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, emanuele); users with modify permissions: Set(yarn, emanuele)
16/04/25 16:24:34 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/04/25 16:24:34 INFO Remoting: Starting remoting
16/04/25 16:24:34 WARN util.Utils: Service 'sparkExecutorActorSystem' could not bind on port 10014. Attempting port 10015.
16/04/25 16:24:34 ERROR Remoting: Remoting error: [Startup failed] [
akka.remote.RemoteTransportException: Startup failed
    at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
    at akka.remote.Remoting.start(Remoting.scala:194)
    at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
    at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
    at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
    at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
    at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
    at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
    at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
    at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1989)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1980)
    at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
    at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:217)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:183)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:148)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:250)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: <hostname.redacted>/10.19.192.29:10014
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
    at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:391)
    at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:388)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Success.map(Try.scala:206)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:444)
    at sun.nio.ch.Net.bind(Net.java:436)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
    at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
]

[...]

16/04/25 16:24:35 INFO Remoting: Starting remoting
16/04/25 16:24:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@<hostname.redacted>:10025]
16/04/25 16:24:35 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutorActorSystem@<hostname.redacted>:10025]
16/04/25 16:24:35 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 10025.
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_1/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-20ef5072-5398-4e44-b38d-710038d26c7f
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_2/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-f7e45ba1-1e24-49fb-a475-74057513f119
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_3/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-d8bdba6b-4296-49fd-ada2-b66b9a379b1c
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_4/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-947725bd-a63a-4f23-ae19-05a0a1722b40
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_5/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-c8d2b2cf-de63-42f5-a2e0-7390066f65ba
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_6/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-3bffbf82-2b57-4f5f-9e60-2a9bfec07120
16/04/25 16:24:36 INFO storage.MemoryStore: MemoryStore started with capacity 530.3 MB
16/04/25 16:24:36 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://[email protected]:10013
16/04/25 16:24:36 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
16/04/25 16:24:36 INFO executor.Executor: Starting executor ID 9 on host <hostname.redacted>
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10011. Attempting port 10012.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10012. Attempting port 10013.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10013. Attempting port 10014.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10014. Attempting port 10015.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10015. Attempting port 10016.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10016. Attempting port 10017.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10017. Attempting port 10018.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10018. Attempting port 10019.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10019. Attempting port 10020.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10020. Attempting port 10021.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10021. Attempting port 10022.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10022. Attempting port 10023.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10023. Attempting port 10024.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10024. Attempting port 10025.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10025. Attempting port 10026.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10026. Attempting port 10027.
16/04/25 16:24:36 ERROR netty.Inbox: Ignoring error
java.net.BindException: Address already in use: Service 'org.apache.spark.network.netty.NettyBlockTransferService' failed after 16 retries!
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:444)
    at sun.nio.ch.Net.bind(Net.java:436)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1089)
    at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:430)
    at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:415)
    at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:903)
    at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
    at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/04/25 16:24:36 ERROR executor.CoarseGrainedExecutorBackend: Received LaunchTask command but executor was null
16/04/25 16:24:36 INFO storage.DiskBlockManager: Shutdown hook called
16/04/25 16:24:36 INFO util.ShutdownHookManager: Shutdown hook called

saveToMemSQLApp.scala question

Hi,

I'm trying to run the example code in WriteToMemSQLApp.scala.
I'm running it through spark-shell, as a yarn-client (my Spark version is 1.3.1).

When I run :
val rdd = sc.parallelize(values)
rdd.saveToMemSQL(dbName, outputTableName, host, port, user, password)
I get an error because saveToMemsql is not a method of rdd.

It seems to me that sc is a org.apache.spark.SparkContext object and that sc.parallelize() returns an org.apache.spark.rdd.RDD object which has no saveToMemsql method.

Where am I wrong ?

Thanks a lot for any hint,
Kimchitsigai

getting error while loading table into spark dataframe

i am using spark 2.1.0 on linux centos
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)

val movies = spark.read.format("com.memsql.spark.connector").options(Map("query" -> ("select * from movies limit 10"),"database" -> "movielens")).load()

java.lang.ClassNotFoundException: org.apache.spark.Logging was removed in Spark 2.0. Please check if your library is compatible with Spark 2.0
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:580)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
... 48 elided
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:554)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:554)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:554)
... 53 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 86 more

movie table in movielens database:
CREATE TABLE movies (
id int(11) DEFAULT NULL,
title varchar(300) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
genres varchar(300) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL
/*!90618 , SHARD KEY () */
)

ReceiverInputDStream in MemSQL Streamliner

Have you ever investigated if a ReceiverInputDStream can work with MemSQL Streamliner like a "normal" InputDStream does?

I was trying to create a KafkaExtractor based on:
https://github.com/dibbhatt/kafka-spark-consumer
(mostly to get checkpointing to Zookeeper for free)

My code is very similar to the KafkaExtractor/ByteArrayExtractor, but fails with:

org.apache.spark.SparkException: org.apache.spark.streaming.dstream.PluggableInputDStream@2b57a5f5 has not been initialized

I tried running start() on the ReceiverInputDStream, running onStart() on its receiver, etc., but no luck.

I guess SparkStreamingContext.start() is doing a lot of stuff to make ReceiverInputDStreams work... I was wondering if you ever had experience with that.

update only mode

Hey,
is there an option to only update a table ? with data coming from a stream
in saveToMemSql there are 2 available strategies :

  1. insert
  2. load

Thanks,
Lior

insert on duplicate key causing locking errors

I'm trying read data from kafka and insert it in memsql using insert on duplicate key option via saveToMemsql (see code below)

I'm running with 2 executors (YARN-CLIENT) and I'm getting the following error:
java.sql.SQLException (Leaf Error (memsql-leaf-02.p1:3306): Lock wait timeout exceeded; try restarting transaction)
is there something that can be done to avoid this kind of error:

MY CODE:

 import java.sql.{Connection, DriverManager}
  import kafka.serializer.StringDecoder
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.{Row, SQLContext}
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.kafka._
  import com.memsql.spark.connector.DataFrameFunctions
  import com.memsql.spark.connector.MemSQLConf
  import com.memsql.spark.connector.sql.TableIdentifier
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.{DataFrame, SaveMode}
  import org.apache.spark.sql.memsql.SaveToMemSQLConf
  import java.sql.{DriverManager, Statement}
  def RunUpdateQuery(values: Iterator[Row]):Unit = {
    // connect to the database named "mysql" on the localhost
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://memsql-ops-01.p1/bi_user_state"
    val username = "app_bi_user"
    val password = "Cc113355!"
    var result = scala.collection.mutable.Map.empty[Int,(Long,Int)]
     if(values == null)
    {
      println("It's null!!!!!")
    }
    // there's probably a better way to do this
    var connection: Connection = null
    try {
      // make the connection
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      for (record <- values) {
        var counter = 0
        // create the statement, and run the select query
        val statement = connection.createStatement()
        val resultSet = statement.executeUpdate(s"UPDATE dim_all_users_info_110916 set total_spins = ${record.get(1)} where user_id = ${record.get(0)}")
      }
    } catch {
      case e => e.printStackTrace
    }
    connection.close()
  }
  val conf = sc.getConf
  conf.set("memsql.host", "host")
  conf.set("memsql.port", 3306)
//  conf.set("memsql.defaultSaveMode", "CreateMode.DatabaseAndTable")
//  conf.set("memsql.defaultInsertBatchSize", "10000")
//  conf.set("memsql.defaultLoadDataCompression", "GZIP")
  conf.set("memsql.defaultCreateMode", "Skip")
  conf.set("memsql.defaultDatabase", "db")
  conf.set("memsql.user", "usr")
  conf.set("memsql.password", "pass")
  sc.stop
  val kafkaParams = Map[String, String]("metadata.broker.list" -> "kafaka-borker:9092")
  val topicsSet = "sm_session".split(",").toSet
//  val topicsSet = "topic_name".split(",").toSet
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(conf, Seconds(10))
  val sqlContext = new SQLContext(ssc.sparkContext)
//  val sqlContext = new org.apache.spark.sql.hive.HiveContext(ssc.sparkContext)
  val memsqlContext = new MemSQLContext(ssc.sparkContext)
     val directKafkaStream =  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)
//  directKafkaStream.map(x=>x._2).repartition(4).foreachRDD(x=>{ sqlContext.read.json(x).groupBy("userId").count().foreachPartition(RunUpdateQuery)})
  directKafkaStream.map(x=>x._2).repartition(2).foreachRDD(x=>{
    var memSQLConf = MemSQLConf.apply(conf)
   val saveToMemSQLConf  = SaveToMemSQLConf.apply(memsqlConf=memSQLConf,params=Map("onDuplicateKeySQL"-> "user_balance_coins=values(user_balance_coins)"))
    val tableIdentifier = TableIdentifier("bi_user_state","sm_user_profile_110916")
    val df = sqlContext.read.json(x).select("userId","userBalance","timestamp")
                                    .withColumnRenamed("userId","user_id")
                                    .withColumnRenamed("userBalance","user_balance_coins")
                                    .withColumnRenamed("timestamp","spin_ts")
    df.saveToMemSQL(tableIdentifier,saveToMemSQLConf)})
  //    .write.format("com.memsql.spark.connector").save("db.table")
  ssc.start`

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException

Not sure what's causing the error below. I'm reading some data from cassandra and trying to write it to memSQL. Any ideas how to resolve the issue?

Here is my code:

import com.memsql.spark.connector.MemSQLContext

val memsqlContext = new MemSQLContext(sc)
val df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "customer", "keyspace" -> "cloudscm" )).load()
df.registerTempTable("customers")
val memdf = sqlContext.sql("SELECT id, site, customergroup, name from customers")
memdf.write.format("com.memsql.spark.connector").save("test.customer")

I created the table in memsql using the following command:

create table customer(id varchar(256), site varchar(256), customergroup varchar(256), name varchar(256));

Here is the error I get:

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '())' at line 1
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
    at com.mysql.jdbc.Util.getInstance(Util.java:360)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:978)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2582)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2526)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:848)
    at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:742)
    at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:291)
    at org.apache.commons.dbcp2.DelegatingStatement.execute(DelegatingStatement.java:291)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1$$anonfun$apply$8.apply(MemSQLCluster.scala:141)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1$$anonfun$apply$8.apply(MemSQLCluster.scala:140)
    at com.memsql.spark.connector.util.Loan.to(Loan.scala:6)
    at com.memsql.spark.connector.util.JDBCImplicits$ConnectionHelpers.withStatement(JDBCImplicits.scala:51)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1.apply(MemSQLCluster.scala:140)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$createTable$1.apply(MemSQLCluster.scala:139)
    at com.memsql.spark.connector.util.Loan.to(Loan.scala:6)
    at com.memsql.spark.connector.MemSQLConnectionPool$.withConnection(MemSQLConnectionPool.scala:38)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$withMasterConn$1.apply(MemSQLCluster.scala:23)
    at com.memsql.spark.connector.MemSQLCluster$$anonfun$withMasterConn$1.apply(MemSQLCluster.scala:23)
    at com.memsql.spark.connector.MemSQLCluster.createTable(MemSQLCluster.scala:139)
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:70)
    at org.apache.spark.sql.memsql.MemSQLTableRelation.insert(MemSQLRelation.scala:45)
    at org.apache.spark.sql.memsql.DefaultSource.createRelation(DefaultSource.scala:33)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC.<init>(<console>:44)
    at $iwC.<init>(<console>:46)
    at <init>(<console>:48)
    at .<init>(<console>:52)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

load data in upsert mode

when looking in the code here

I saw that upsert mode is not supported yet.
is there a plan to support sometime?

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.