Git Product home page Git Product logo

azure-kusto-spark's Introduction

Azure Data Explorer + Apache Spark Connector

Azure Data Explorer Connector for Apache Spark

master: Build

This library contains the source code for Azure Data Explorer Data Source and Data Sink Connector for Apache Spark.

Azure Data Explorer (A.K.A. Kusto) is a lightning-fast indexing and querying service.

Spark is a unified analytics engine for large-scale data processing.

Making Azure Data Explorer and Spark work together enables building fast and scalable applications, targeting a variety of Machine Learning, Extract-Transform-Load, Log Analytics and other data-driven scenarios.

This connector works with the following spark environments: Azure Databricks, Azure Synapse Data Explorer and Real time analytics in Fabric

Changelog

For major changes from previous releases, please refer to Releases. For known or new issues, please refer to the issues section.

Note: Use the 4.x series only if you are using JDK 11. Versions 3.x and 5.x will work with JDK8 and all versions up

Usage

Linking

Starting version 2.3.0, we introduce new artifact Ids: kusto-spark_3.x_2.12 targeting Spark 3.x and Scala 2.12 and kusto-spark_2.4_2.11 targeting Spark 2.4.x and scala 2.11. The latter is built from branch "2.4". For Scala/Java applications using Maven project definitions, link your application with the artifact below to use the Azure Data Explorer Connector for Spark.

Note: Versions prior to 2.5.1 can no longer ingest to an existing table.

groupId = com.microsoft.azure.kusto
artifactId = kusto-spark_3.0_2.12
version = 5.0.6

In Maven:

Look for the following coordinates:

com.microsoft.azure.kusto:kusto-spark_3.0_2.12:5.0.6

Or clone this repository and build it locally to add it to your local maven repository,. The jar can also be found under the released package

   <dependency>
       <groupId>com.microsoft.azure.kusto</groupId>
       <artifactId>kusto-spark_3.0_2.12</artifactId>
       <version>5.0.6</version>
   </dependency>

In SBT:

libraryDependencies ++= Seq(
  "com.microsoft.azure.kusto" %% "kusto-spark_3.0" % "5.0.6"
)

In Databricks:

Libraries -> Install New -> Maven -> copy the following coordinates:

com.microsoft.azure.kusto:kusto-spark_3.0_2.12:5.0.6

Building Samples Module

Samples are packaged as a separate module with the following artifact

<artifactId>connector-samples</artifactId>

To build the whole project comprised of the connector module and the samples module, use the following artifact:

<artifactId>azure-kusto-spark</artifactId>

Build Prerequisites

To use the connector, you need:

  • Java 1.8 SDK installed
  • Maven 3.x installed
  • Spark - with the version aligned with the artifact ID (either 2.4 or 3.0)

Note: when working with Spark version 2.3 or lower, build the jar locally from branch "2.4" and simply change the spark version in the pom file.

Build Commands

// Builds jar and runs all tests
mvn clean package

// Builds jar, runs all tests, and installs jar to your local maven repository
mvn clean install

Pre-Compiled Libraries

To facilitate ramp-up from local jar on platforms such as Azure Databricks, pre-compiled libraries are published under GitHub Releases. These libraries include:

  • Azure Data Explorer connector library
  • User may also need to include Kusto Java SDK libraries (kusto-data and kusto-ingest), which are published under GitHub Releases

Dependencies

Spark Azure Data Explorer connector depends on Azure Data Explorer Data Client Library and Azure Data Explorer Ingest Client Library, available in maven repository. When Key Vault based authentication is used, there is an additional dependency on Microsoft Azure SDK For Key Vault.

Note: When working with JARs, Azure Data Explorer connector requires Azure Data Explorer Java client libraries (and azure key-vault library if used) to be installed. To find the right version to install, look in the relevant release's pom)

Documentation

Detailed documentation can be found here.

Samples

Usage examples can be found here

Available Azure Data Explorer client libraries:

Currently available client libraries for Azure Data Explorer:

For the comfort of the user, here is a Pyspark sample for the connector.

Need Support?

  • Have a feature request for SDKs? Please post it on User Voice to help us prioritize
  • Have a technical question? Ask on Stack Overflow with tag "azure-data-explorer"
  • Need Support? Every customer with an active Azure subscription has access to support with guaranteed response time. Consider submitting a ticket for assistance from the Microsoft support team.
  • Found a bug? Please help us fix it by thoroughly documenting it and filing an issue.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

azure-kusto-spark's People

Contributors

abmo-x avatar aelij avatar ag-ramachandran avatar asaharn avatar dependabot[bot] avatar hau-mal avatar lizashak avatar manojraheja avatar michazag avatar microsoft-github-policy-service[bot] avatar navalev avatar ohadbitt avatar ohbitton avatar rabee333 avatar slyons avatar tamirkamara avatar toshetah avatar vladikbr avatar y0nil avatar yihezkel avatar yogilad avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

azure-kusto-spark's Issues

getSystemClipboard error

I'm trying to get authenticated using Device token method. My code is run on a scala notebook on Azure DataBricks
My Code:
val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // For microsoft applications, typically "72f988bf-86f1-41af-91ab-2d7cd011db47"
val cluster = "Wdgeventstore"
val database = "Webster"
val table = "ServerMetadata"

val df = sqlContext.read.format("com.microsoft.kusto.spark.datasource")
.option(KustoOptions.KUSTO_CLUSTER, cluster)
.option(KustoOptions.KUSTO_DATABASE, database)
.option(KustoOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
.option(KustoOptions.KUSTO_QUERY, "ServerMetadata | take 10")
.load()
I hit the following error:
java.awt.HeadlessException:
No X11 DISPLAY variable was set, but this program performed an operation which requires it.
at sun.awt.HeadlessToolkit.getSystemClipboard(HeadlessToolkit.java:309)
at com.microsoft.kusto.spark.utils.DeviceAuthentication$.acquireAccessTokenUsingDeviceCodeFlow(DeviceAuthentication.scala:29)
at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.parseSourceParameters(KustoDataSourceUtils.scala:138)
at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:101)

overwrite doesn't work for python

Overwrite doesn't work for python, Append works. Can you please check?
.mode('Overwrite')
10
.save()
java.security.InvalidParameterException: Kusto data source supports only 'Append' mode, 'Overwrite' directive is invalid. Please use df.write.mode(SaveMode.Append).

Error in acquiring ApplicationAccessToken

Hi,

I'm trying to write a dataframe into kusto table, but I got into some authentication issues like below.

My simple code to call the write API. Any idea what happened? I used my AAD account and generate the corresponding secrets from Azure portal.

df.write. \
  format("com.microsoft.kusto.spark.datasource"). \
  option("kustoCluster", "xxxxxxx"). \
  option("kustoDatabase", "xxxxxxxx"). \
  option("kustoTable", "xxxxxxxx"). \
  option("kustoAADClientID", "xxxxxxx"). \
  option("kustoClientAADClientPassword", "xxxxxxxxxx"). \
  option("kustoAADAuthorityID", "xxxxxxxx"). \
  save()

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 df.write. format("com.microsoft.kusto.spark.datasource"). option("kustoCluster", "https://wdgeventstore.kusto.windows.net"). option("kustoDatabase", "CoreData"). option("kustoTable", "WExpInteractionDetection"). option("kustoAADClientID", "167846e2-2119-42fe-bb76-f1289857acdc"). option("kustoClientAADClientPassword", "******"). option("kustoAADAuthorityID", "72f988bf-86f1-41af-91ab-2d7cd011db47"). save()

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
732 self.format(format)
733 if path is None:
--> 734 self._jwrite.save()
735 else:
736 self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling o684.save.
: com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireToken(AadAuthenticationHelper.java:238)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAccessToken(AadAuthenticationHelper.java:82)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:73)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:45)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:41)
at com.microsoft.kusto.spark.datasink.KustoWriter$.cleanupTempTables(KustoWriter.scala:121)
at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:72)
at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:31)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:114)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:690)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:690)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:690)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:284)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)

[BETA06] Mapping reference of type 'csv' in database could not be found

I was trying to update from BETA04 to BETA06 but found one problem in the ingestion code.

val sp = new SparkIngestionProperties
sp.csvMappingNameReference = evaluationsMapping
df.write
      .format("com.microsoft.kusto.spark.datasource")
      .options(evaluationsConnectionOptions)
      .option(KustoSinkOptions.KUSTO_TABLE, evaluationsTable)
      .option(KustoSinkOptions.KUSTO_SPARK_INGESTION_PROPERTIES_JSON, sp.toString())
      .mode(SaveMode.Append)
      .save()

From the error description it seems like the new library creates a temporary table, which does not have the CSV mapping reference and hence the request fails?

The same code works fine in BETA04.

java.lang.RuntimeException: Ingestion to Kusto failed with status 'Failed'. Cluster: '<scrubbed>', database: '<scrubbed>', table: '_tmpTableDatabricksShell_<scrubbed>Metrics_cfe3bc44_7a33_4d2d_a51f_ff97f25d49ee', batch: '', partition: '0''. Ingestion info: '{
  "partitionKey" : "c430a9ff-92bc-4350-8204-45730a521a43",
  "rowKey" : "c430a9ff-92bc-4350-8204-45730a521a43",
  "etag" : "W/\"datetime'2019-10-29T05%3A27%3A01.8290395Z'\"",
  "status" : "Failed",
  "ingestionSourceId" : "c430a9ff-92bc-4350-8204-45730a521a43",
  "ingestionSourcePath" : "https://uqskstrld<scrubbed>00.blob.core.windows.net/j01-20191029-temp-e5c334ee145d4b43a3a2d3a96fbac1df/<scrubbed>__tmpTableDatabricksShell_<scrubbed>Metrics_cfe3bc44_7a33_4d2d_a51f_ff97f25d49ee_c47dc3d0-09c6-4e1c-993c-f42cf76875ec_spark.csv.gz",
  "database" : "<scrubbed>",
  "table" : "_tmpTableDatabricksShell_<scrubbed>Metrics_cfe3bc44_7a33_4d2d_a51f_ff97f25d49ee",
  "updatedOn" : 1572326821795,
  "operationId" : "00000000-0000-0000-0000-000000000000",
  "activityId" : "00000000-0000-0000-0000-000000000000",
  "errorCode" : "BadRequest_MappingReferenceWasNotFound",
  "failureStatus" : "Permanent",
  "details" : "{\r\n    \"error\": {\r\n        \"code\": \"BadRequest_MappingReferenceWasNotFound\",\r\n        \"message\": \"Request is invalid and cannot be executed.\",\r\n        \"@type\": \"Kusto.Data.Exceptions.MappingNotFoundException\",\r\n        \"@message\": \"Mapping reference '<scrubbed>MetricsMapping' of type 'csv' in database '<scrubbed>' could not be found.\",\r\n        \"@context\": {\r\n            \"timestamp\": \"2019-10-29T05:27:01.7895886Z\",\r\n            \"serviceAlias\": \"<scrubbed>\",\r\n            \"machineName\": \"KEngine000000\",\r\n            \"processName\": \"Kusto.WinSvc.Svc\",\r\n            \"processId\": 1364,\r\n            \"threadId\": 8828,\r\n            \"appDomainName\": \"Kusto.WinSvc.Svc.exe\",\r\n            \"clientRequestId\": \"DM.IngestionExecutor;280fff91-2545-46f5-aee9-b961df601fc0\",\r\n            \"activityId\": \"c3302174-52f9-4624-9e10-62a6da01337d\",\r\n            \"subActivityId\": \"09ac5d91-0b49-4910-b38b-6ab3fa0b306e\",\r\n            \"activityType\": \"DN.AdminCommand.DataIngestPullCommand\",\r\n            \"parentActivityId\": \"40bcbf49-36c4-4d72-8c48-2714f01d2dfa\",\r\n            \"activityStack\": \"(Activity stack: CRID=DM.IngestionExecutor;280fff91-2545-46f5-aee9-b961df601fc0 ARID=c3302174-52f9-4624-9e10-62a6da01337d > DN.Admin.Client.ExecuteControlCommand/60138e92-3e73-4bad-abef-d079fafeac63 > P.WCF.Service.ExecuteControlCommandInternal..IAdminClientServiceCommunicationContract/e2041ae3-5e7b-4ce5-9954-32a077760048 > DN.FE.ExecuteControlCommand/40bcbf49-36c4-4d72-8c48-2714f01d2dfa > DN.AdminCommand.DataIngestPullCommand/09ac5d91-0b49-4910-b38b-6ab3fa0b306e)\"\r\n        },\r\n        \"@permanent\": true\r\n    }\r\n}",
  "originatesFromUpdatePolicy" : false,
  "timestamp" : 1572326821829
}'

Replace json4s-jackson extension if possible

We currently use json4s-jackson_2.11 extension,
specifically the dependencies below, for JSON parsing.

This extension has a breaking change compared to previous versions and conflicts with Spark version requiring different Jackson version to be used between Spark 2.3. and 2.4. If possible, it would be better to replace with different JSON library and resolve the connector dependency, so that same connector version could be used for both Spark 2.3/2.4

org.json4s json4s-jackson_2.11 3.5.3 com.fasterxml.jackson.core jackson-databind 2.8.11.1

Correlate RunID/Data lineage ID between spark streaming and kusto

Spark Kusto sync connector works fine in copying data from streaming to Kusto. If there are multiple spark streaming job which sinks to single table. There is no way to traces or links from which stream data has got inserted. We are looking for a way to detect the source stream . In other words, we have streaming job which emits Raw data like ID, Run ID or Reservoir ID, Can we have these IDs tracked in kusto commands like .show commands or .show operations during ingestion process. So we need lineage ID

Streaming Job Raw Data
"id" : "12e0721e-5ddb-4d98-bbcf-ef1409b10201",
"runId" : "aef8916f-fe50-4156-9473-e16435f2d8ec",
"reservoirId" : "40ec0b33-a2a6-4a40-8c62-6eebbaf81699",

KUSTO_DATABASE Parameter Field Not Recognized

Using library version spark-kusto-connector:2.3.1

The spark read connector is not recognizing the value I give for the 'KUSTO_DATABASE' parameter. Attempting to read from a Kusto datasource, the error seems to imply a destination DB is also required but we are not trying to write to a Kusto database.

Current format. Replaced the database field directly with the database name I wanted to use to test, the other fields are populated via config.
kusto_df = spark.read.
format("com.microsoft.kusto.spark.datasource").
option("KUSTO_CLUSTER", kustoOptions["KUSTO_CLUSTER"]).
option("KUSTO_DATABASE", 'defaultdb').
option("KUSTO_TABLE", kustoOptions["KUSTO_TABLE"]).
option("KUSTO_AAD_APP_ID", kustoOptions["KUSTO_AAD_APP_ID"]).
option("KUSTO_AAD_APP_SECRET", kustoOptions["KUSTO_AAD_APP_SECRET"]).
option("KUSTO_AAD_AUTHORITY_ID", kustoOptions["KUSTO_AAD_AUTHORITY_ID"]).
option("blobStorageAccountName", kustoOptions["blobStorageAccountName"]).
option("blobStorageAccountKey", kustoOptions["blobStorageAccountKey"]).
option("blobContainer", kustoOptions["blobContainer"]).
option("clientRequestPropertiesJson", crp.toString()).
load()

Error output:
Py4JJavaError: An error occurred while calling o9544.load.
: java.security.InvalidParameterException: KUSTO_DATABASE parameter is missing. Must provide a destination database name
at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.parseSourceParameters(KustoDataSourceUtils.scala:100)
at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:385)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:424)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:391)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:391)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:264)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)

Maven Repository returns 404 when trying to access latest Spark Kusto Connector

If we navigate to https://mvnrepository.com/artifact/com.microsoft.azure.kusto we will see three spark-kusto-connector artifact groups:

image

The first one (no. 3) has a link to https://mvnrepository.com/artifact/com.microsoft.azure.kusto/spark-kusto-connector. It works correctly, but the latest release was on Feb 16, 2021, and it contains only 2.x versions.

Two following items (no. 4 and 5) both have a link to https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark and this link is broken, it returns 404. The latest release was a few days ago, looks like there are some routing problems.

Syntax error while creating a temp Kusto table

Describe the bug
while saving a dataframe to Kusto in the "append" mode, Kusto connector throws an exception below.
Caused by: com.microsoft.azure.kusto.data.exceptions.DataWebException: BadRequest_SyntaxError: Request is invalid and cannot be executed.
Syntax error: . Query: '.create table _tmpTableDatabricksShell_AzDUtilization_msu_common_6ec5d478_629f_469f_9783_9fe1d6ae5624 (['string']:ClusterTypeGroup,['string']:Geo,['string']:Region,['real']:TotalUtilization,['long']:TotalActualCapacity,['long']:TotalUsage,['long']:TotalUnusedCapacity,['long']:ReservationNodes,['long']:OutOfRotation,['real']:AverageOfTargetUtilization,['string']:AverageOfDte,['datetime']:LatestNextCapacity,['datetime']:Timestamp,['datetime']:ExtractDT) with(hidden=true)'
Error details:
ClientRequestId='KJC.execute;dec0203c-cdb7-4525-a959-f1a36f126470', ActivityId='a880764e-7dc4-4122-bbe1-7f5a87d2eacc', Timestamp='2021-03-24T21:26:13.8316166Z'.
it looks like that the problem is from the reversed order of column name and column type in the creation script.
Currently, we are using 1.1.2 and 1.1.6 Kusto connector.

Expected behavior
can create a temp table.

Screenshots
image

Additional context
Azure databricks 6.4 with Kusto connector 1.1.2/1.1.6
somehow, this problem happens in a specific Kusto cluster.

Readme is out of date

The current version of the package on Maven is 1.1.4, but the readme still details using 1.1.2.

java.util.concurrent.TimeoutException in version 0.4 writing to kusto

Describe the bug
Using the connector and writing a dataframe created from parquet files after running for a while java.util.concurrent.TimeoutException: Futures timed out after [-1 seconds] is thrown

To Reproduce
Build with version spark_kusto_connector_1_0_0_BETA_04.jar
Use Databricks runtime 5.5 ML
Read from parquet or an existing Databricks table to a dataframe.
Write to a ADX cluster

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
java.util.concurrent.TimeoutException: Futures timed out after [-1 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at com.microsoft.kusto.spark.utils.KustoClient.finalizeIngestionWhenWorkersSucceeded(KustoClient.scala:148)
at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:95)
at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:34)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:114)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
at line1a973e2319a940478d7ef22828f1867b37.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-1201276592678858:26)
at line1a973e2319a940478d7ef22828f1867b37.$read$$iw$$iw$$iw$$iw$$iw.(command-1201276592678858:85)
at line1a973e2319a940478d7ef22828f1867b37.$read$$iw$$iw$$iw$$iw.(command-1201276592678858:87)
at line1a973e2319a940478d7ef22828f1867b37.$read$$iw$$iw$$iw.(command-1201276592678858:89)
at line1a973e2319a940478d7ef22828f1867b37.$read$$iw$$iw.(command-1201276592678858:91)
at line1a973e2319a940478d7ef22828f1867b37.$read$$iw.(command-1201276592678858:93)
at line1a973e2319a940478d7ef22828f1867b37.$read.(command-1201276592678858:95)
at line1a973e2319a940478d7ef22828f1867b37.$read$.(command-1201276592678858:99)
at line1a973e2319a940478d7ef22828f1867b37.$read$.(command-1201276592678858)
at line1a973e2319a940478d7ef22828f1867b37.$eval$.$print$lzycompute(:7)
at line1a973e2319a940478d7ef22828f1867b37.$eval$.$print(:6)
at line1a973e2319a940478d7ef22828f1867b37.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:679)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:632)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:197)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:368)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:345)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:271)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:345)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
at java.lang.Thread.run(Thread.java:748)

docs/CHANGELIST.md file is missing in master branch

Describe the bug
In the master branch head (commit 1f3a054), the Readme refers to a changelist. For example: For main changes from previous releases and known issues please refer to [CHANGELIST](docs/CHANGELIST.md)

However, this referenced docs/CHANGELIST.md file is not available in the master branch.

To Reproduce

  1. In a webbrowser, open the repository homepage at https://github.com/Azure/azure-kusto-spark
  2. In the rendered Readme markdown text, click on the first 'CHANGELIST' hyperlink. This redirects the browser to https://github.com/Azure/azure-kusto-spark/blob/master/docs/CHANGELIST.md, which returns a Github 404 error page.

Expected behavior
I expect the changelist to be available in the master branch.

Could you please look into this? Thanks in advance.

Ability to write to file format of my choice other than csv while ingestion

While writing the data to Kusto, I guess the SPARK Connector is writing to blob storage account in csv format. The problem I have with this is that my data can contain new lines or any such format that can break csv. I need an ability to specify the format (Ex: json.gz or tsv.gz or avro.gz) here.

Getting java.io.InvalidClassException: com.microsoft.kusto.spark.datasink.SinkTableCreationMode$

Hi Team,

We are using "com.microsoft.kusto.spark.datasink.KustoSinkOptions" library in our spark scala code to write the data into adx table. We are getting below error from last four days. Please help us to know what could be the issue for this exception.

"Caused by: java.io.InvalidClassException: com.microsoft.kusto.spark.datasink.SinkTableCreationMode$; local class incompatible: stream classdesc serialVersionUID = -8994901548193566197, local class serialVersionUID = -5675826937088959115"

Spark Code : readDf.write.format("com.microsoft.kusto.spark.datasource")
.option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
.option(KustoSinkOptions.KUSTO_DATABASE, database)
.option(KustoSinkOptions.KUSTO_TABLE, temp_table)
.option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
.option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
.option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
.mode(SaveMode.Append)
.save()

Library : com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.3.0

Thanks,
Lohith R

Question: PySpark connector without using an AAD app

Hi guys,

I am currently using DataBricks and trying to set up a PySpark connect without using an Azure AD application.
I was looking at the provided examples in PySpark but they all require an AAD_CLIENT_ID. I was wondering whether or not there is a way not to use any application?
Thank you for your help!

Unable to write to ADX Database using Azure Kusto Spark library using version com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.3.0 on Databricks cluster

Describe the bug

Unable to ingest data into ADX database using Azure Kusto Spark SDK. We currently have a Databricks notebook successfully streaming data to an ADX table. After creating a new cluster the same method no longer works with the exact same data. The error is the following:

Caused by: com.microsoft.azure.kusto.data.exceptions.DataWebException: {
'error': {
'code': 'BadRequest_SyntaxError',
'message': 'Request is invalid and cannot be executed.',
'@type': 'Kusto.Data.Exceptions.SyntaxException',
'@message': 'Syntax error: . Query: '.create table _tmpTableDatabricks_Shell_test_XTOWellsDaily_3eb22cb2_85b3_41d7_a118_6ea476e50276 (['string']:APINumber,['string']:Source,['string']:UniqueTag,['string']:Measurement,['datetime']:RecordDateTimeUTC,['string']:Value,['string']:Unit,['datetime']:RecordAddedUTC) with(hidden=true)'',

The above issue occurs when using com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.3.0. By using com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.5.2

To Reproduce

Write data to an existing table with the following code:

writeTable = "test_existing"

(df0
.write
.format("com.microsoft.kusto.spark.datasource")
.option("kustoCluster",sparkKustoOptions_Cluster2["kustoCluster"])
.option("kustoDatabase",sparkKustoOptions_Cluster2["kustoDatabase"])
.option("kustoTable", writeTable)
.option("kustoAadAppId",sparkKustoOptions_Cluster2["kustoAadAppId"])
.option("kustoAadAppSecret",sparkKustoOptions_Cluster2["kustoAadAppSecret"])
.option("kustoAadAuthorityID",sparkKustoOptions_Cluster2["kustoAadAuthorityID"])
.option("tableCreateOptions","CreateIfNotExist")
.mode("Append")
.save()
)

Expected behavior
com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.3.0 works for older cluster, hence I would like to understand prerequisite for using com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.5.2

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: n/a. Databricks Runtime ver 7.6
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

Release for Scala 2.12

Is your feature request related to a problem? Please describe.
Databricks 7.0+ is using Scala 2.12. The current kusto-spark-connector is built against Scala 2.11. Thus, it cannot be used with the most recent versions of Databricks. The most recent version of Databricks is 7.3 Beta at the time of this writing.

Describe the solution you'd like
Build for Scala 2.12 as well.

Describe alternatives you've considered
I could pull down the source code and build it manually, however, it only solves my issue and not for the community as a whole.

Additional context
When I tried to run Spark with the Connector installed, I get the following:

Py4JJavaError: An error occurred while calling o2145.start.
: java.lang.NoClassDefFoundError: scala/Product$class
	at com.microsoft.kusto.spark.datasink.WriteOptions.<init>(KustoSinkOptions.scala:43)
	at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.parseSinkParameters(KustoDataSourceUtils.scala:207)
	at com.microsoft.kusto.spark.datasink.KustoSinkProvider.createSink(KustoSinkProvider.scala:17)
	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:325)

Handle Big Partitions

Address Kusto ingestion expecting blobs no larger (typically) than 1GB:

  • ย Estimate partition size, or track when ingesting to blob (e.g. number of 1K buffers ingested). Once the limit is reached (1GB compressed, or 1GB x compression-factor uncompressed), open a new blob
  • ย Ingest all blobs (currently only a single blob per partition is supported)

Show authentication issue before sinking data

Is your feature request related to a problem? Please describe.

When we move the data from Delta table to Kusto table, As of today, Kusto sink connector copies the data from dataframe and put into temporary table by using Data Managed Ingestion Executor
This temporary table is in csv.gz format, once kusto sink connector completes sinking, data from temporary table copies to final target table. i.e. commit all the records into target table.

As per document, to insert data into existing table, caller should have permission of administrator.
Now validation of caller happens here at final stage where lot of data copied into temporary table.

Instead of validating user having permission at end , it would be better to validate caller has role of administrator or permission to insert data in the beginning, this reduces I/O calls and copying data unnecessary.

Provide capability of adding tags based on column while ingesting into kusto

As of today, KUSTO_SPARK_INGESTION_PROPERTIES_JSON allows to set tags like ingest-by and drop-by tags. By setting static value, all the rows gets inserted with same tags. There should a way to take column name part of Dataframe and tag row dynamically.

Like if my data-frame has column with Date, by taking "Date" column value and tag it.

Today after inserting data with static value, again i have run alter command to re-tag with different value.

Since Kusto sink is using foreachwriter, you should be able to read column value and set tag before sending to blob.

Retry resources fetch

We should implement retry resources fetch - this should allow us to still use recources if fetch failed
We should consider retries on blob write, on ingestion operation and on ingestion results.

com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken

Running runtime 6.4, spark 2.4.5, scala 2.11 and com.microsoft.azure.kusto:kusto-spark_2.4_2.11:2.5.1 library in databricks.

I am trying to append to an existing table and seeing this error:
com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken
I can reproduce the error using runtime 7.5, spark 3.0.1, scala 2.12 and com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.5.1.
The script run without an issues until the 2021-03-01 10:40:00 GMT.

Trace-back of the error:

Py4JJavaError Traceback (most recent call last)
in
13 option("clientRequestPropertiesJson", crp.toString()).
---> 14 option("readMode", 'ForceDistributedMode').
15 load()
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
183 else:
--> 184 return self._df(self._jreader.load())
185
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o943.load.
: com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireToken(AadAuthenticationHelper.java:263)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAccessToken(AadAuthenticationHelper.java:103)
at com.microsoft.azure.kusto.data.ClientImpl.initHeaders(ClientImpl.java:153)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:97)
at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.getSchema(KustoDataSourceUtils.scala:91)
at com.microsoft.kusto.spark.datasource.KustoRelation.getSchema(KustoRelation.scala:131)
at com.microsoft.kusto.spark.datasource.KustoRelation.schema(KustoRelation.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:492)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:373)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.azure.kusto.data.exceptions.DataClientException: Error in acquiring ApplicationAccessToken
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadApplicationAccessToken(AadAuthenticationHelper.java:158)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireToken(AadAuthenticationHelper.java:247)
... 23 more
Caused by: java.util.concurrent.ExecutionException: com.microsoft.aad.adal4j.AuthenticationException: {"error_description":"AADSTS500011: The resource principal named null://ingest-ingest-XXXXXXX.northeurope.kusto.windows.net was not found in the tenant named . This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You might have sent your authentication request to the wrong tenant.\r\nTrace ID: 46f1159e-8ad3-48df-913a-2bb61db03f00\r\nCorrelation ID: 35e45cb8-1a8d-433e-8bb1-2776e0f8c281\r\nTimestamp: 2021-03-03 15:17:57Z","error":"invalid_resource","error_uri":"https://login.microsoftonline.com/error?code=500011"}
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadApplicationAccessToken(AadAuthenticationHelper.java:156)
... 24 more
Caused by: com.microsoft.aad.adal4j.AuthenticationException: {"error_description":"AADSTS500011: The resource principal named null://ingest-ingest-prddataplatform.northeurope.kusto.windows.net was not found in the tenant named . This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You might have sent your authentication request to the wrong tenant.\r\nTrace ID: 46f1159e-8ad3-48df-913a-2bb61db03f00\r\nCorrelation ID: 35e45cb8-1a8d-433e-8bb1-2776e0f8c281\r\nTimestamp: 2021-03-03 15:17:57Z","error":"invalid_resource","error_uri":"https://login.microsoftonline.com/error?code=500011"}
at com.microsoft.aad.adal4j.AdalTokenRequest.executeOAuthRequestAndProcessResponse(AdalTokenRequest.java:129)
at com.microsoft.aad.adal4j.AuthenticationContext.acquireTokenCommon(AuthenticationContext.java:930)
at com.microsoft.aad.adal4j.AcquireTokenCallable.execute(AcquireTokenCallable.java:70)
at com.microsoft.aad.adal4j.AcquireTokenCallable.execute(AcquireTokenCallable.java:38)
at com.microsoft.aad.adal4j.AdalCallable.call(AdalCallable.java:47)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
During handling of the above exception, another exception occurred:
Py4JJavaError Traceback (most recent call last)
in
27 option("kustoAadAuthorityID",kustoOptions["kustoAadAuthorityID"]).
28 option("tableCreateOptions","CreateIfNotExist").
---> 29 mode("Append").
30 save()
31
/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
826 self.format(format)
827 if path is None:
--> 828 self._jwrite.save()
829 else:
830 self._jwrite.save(path)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o957.save.
: com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireToken(AadAuthenticationHelper.java:263)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAccessToken(AadAuthenticationHelper.java:103)
at com.microsoft.azure.kusto.data.ClientImpl.initHeaders(ClientImpl.java:153)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:97)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:56)
at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:61)
at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:42)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:91)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1018)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:841)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:198)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1018)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:439)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:423)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.azure.kusto.data.exceptions.DataClientException: Error in acquiring ApplicationAccessToken
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadApplicationAccessToken(AadAuthenticationHelper.java:158)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireToken(AadAuthenticationHelper.java:247)
... 38 more
Caused by: java.util.concurrent.ExecutionException: com.microsoft.aad.adal4j.AuthenticationException: {"error_description":"AADSTS500011: The resource principal named null://ingest-ingest-XXXXXX.northeurope.kusto.windows.net was not found in the tenant named . This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You might have sent your authentication request to the wrong tenant.\r\nTrace ID: 5c967829-5e7e-4b79-85b2-f85703204600\r\nCorrelation ID: 9e3c6953-25e0-4740-9437-466e489247d1\r\nTimestamp: 2021-03-03 15:17:58Z","error":"invalid_resource","error_uri":"https://login.microsoftonline.com/error?code=500011"}
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadApplicationAccessToken(AadAuthenticationHelper.java:156)
... 39 more
Caused by: com.microsoft.aad.adal4j.AuthenticationException: {"error_description":"AADSTS500011: The resource principal named null://ingest-ingest-XXXXXX.northeurope.kusto.windows.net was not found in the tenant named . This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You might have sent your authentication request to the wrong tenant.\r\nTrace ID: 5c967829-5e7e-4b79-85b2-f85703204600\r\nCorrelation ID: 9e3c6953-25e0-4740-9437-466e489247d1\r\nTimestamp: 2021-03-03 15:17:58Z","error":"invalid_resource","error_uri":"https://login.microsoftonline.com/error?code=500011"}
at com.microsoft.aad.adal4j.AdalTokenRequest.executeOAuthRequestAndProcessResponse(AdalTokenRequest.java:129)
at com.microsoft.aad.adal4j.AuthenticationContext.acquireTokenCommon(AuthenticationContext.java:930)
at com.microsoft.aad.adal4j.AcquireTokenCallable.execute(AcquireTokenCallable.java:70)
at com.microsoft.aad.adal4j.AcquireTokenCallable.execute(AcquireTokenCallable.java:38)
at com.microsoft.aad.adal4j.AdalCallable.call(AdalCallable.java:47)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Data loss for async write with checkpointing enabled

We have a data pipeline where we ingest data from Azure Event Hub and after processing, write the data to Kusto. We are using Spark structured streaming with spark version 2.4 and scala version 2.11. Also, we are writing to Kusto asynchronously and have checkpointing enabled for the SparkSession.

Our pom dependency for the kusto library used is:

<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>spark-kusto-connector</artifactId>
<version>1.1.2</version>
</dependency>

The issue we are having is that if the job fails and restarts we see that the eventhub uses checkpointing to resume from the offsets till which it had last processed; but on the Kusto side we still end up losing data. It would be helpful if I could get some insight into the process so that we can solve the data loss issue.

Spark Streaming

  • Validate current implementation of spark streaming support
  • Test performance with streaming load
  • Add streaming sample

com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken

Describe the bug
Execute same Kusto query, if I take 1000 it could work correct(take 10, take 100 all work correct), if I take 10000 it will return a error "DataServiceException: Error in acquiring ApplicationAccessToken"

To Reproduce
For example:
kustoDf = pyKusto.read.
format("com.microsoft.kusto.spark.datasource").
option("kustoCluster", kustoOptions["kustoCluster"]).
option("kustoDatabase", kustoOptions["kustoDatabase"]).
option("kustoQuery", "set notruncation; tablename | where condition == True | take 1000").
option("kustoAadAppId", kustoOptions["kustoAadAppId"]).
option("kustoAadAppSecret", kustoOptions["kustoAadAppSecret"]).
option("kustoAadAuthorityID", kustoOptions["kustoAadAuthorityID"]).
load()
print(kustoDf.count())

Out: 1000

kustoDf1 = pyKusto.read.
format("com.microsoft.kusto.spark.datasource").
option("kustoCluster", kustoOptions["kustoCluster"]).
option("kustoDatabase", kustoOptions["kustoDatabase"]).
option("kustoQuery", "set notruncation; tablename | where condition == True | take 10000").
option("kustoAadAppId", kustoOptions["kustoAadAppId"]).
option("kustoAadAppSecret", kustoOptions["kustoAadAppSecret"]).
option("kustoAadAuthorityID", kustoOptions["kustoAadAuthorityID"]).
load()
print(kustoDf1.count())

Out: com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken

Expected behavior
If I write take 10000 in Kusto query, function should return 10000 records to data frame

Azure Data Bricks Cluster info:

  • Runtime: [6.6 (includes Apache Spark 2.4.5, Scala 2.11)]

azure-kusto-spark info
Install from Maven

  • com.microsoft.azure.kusto:kusto-spark_2.4_2.11:2.3.0

Detail Error Info:

Py4JJavaError Traceback (most recent call last)
in
8 option("kustoAadAuthorityID", kustoOptions["kustoAadAuthorityID"]).
9 load()
---> 10 print(kustoDf1.count())

/databricks/spark/python/pyspark/sql/dataframe.py in count(self)
528 2
529 """
--> 530 return int(self._jdf.count())
531
532 @ignore_unicode_prefix

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling o9376.count.
: com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireToken(AadAuthenticationHelper.java:263)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAccessToken(AadAuthenticationHelper.java:103)
at com.microsoft.azure.kusto.data.ClientImpl.initHeaders(ClientImpl.java:153)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:97)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:56)
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:51)
at com.microsoft.kusto.spark.utils.ContainerProvider.refresh(ContainerProvider.scala:35)
at com.microsoft.kusto.spark.utils.ContainerProvider.getAllContainers(ContainerProvider.scala:29)
at com.microsoft.kusto.spark.utils.KustoClient.getTempBlobsForExport(KustoClient.scala:82)
at com.microsoft.kusto.spark.datasource.KustoRelation.buildScan(KustoRelation.scala:104)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$11.apply(DataSourceStrategy.scala:373)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$11.apply(DataSourceStrategy.scala:373)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:407)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:406)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:462)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:402)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:369)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:102)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$3.apply(QueryPlanner.scala:87)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$3.apply(QueryPlanner.scala:84)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:84)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:76)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:102)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$3.apply(QueryPlanner.scala:87)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$3.apply(QueryPlanner.scala:84)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:84)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:76)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:102)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:77)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:385)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$sparkPlan$1.apply(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$sparkPlan$1.apply(QueryExecution.scala:100)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$executePhase$1.apply(QueryExecution.scala:229)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:228)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:99)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:99)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$executedPlan$1.apply(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$executedPlan$1.apply(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$executePhase$1.apply(QueryExecution.scala:229)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:228)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:254)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:254)
at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:254)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1$$anonfun$apply$1.apply(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:233)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:74)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:185)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3496)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2939)
at sun.reflect.GeneratedMethodAccessor444.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.azure.kusto.data.exceptions.DataClientException: Error in acquiring ApplicationAccessToken
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadApplicationAccessToken(AadAuthenticationHelper.java:158)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireToken(AadAuthenticationHelper.java:247)
... 90 more
Caused by: java.util.concurrent.ExecutionException: com.microsoft.aad.adal4j.AuthenticationException: {"error_description":"AADSTS500011: The resource principal named https://ingest-kusto.aria.microsoft.com was not found in the tenant named 72f988bf-86f1-41af-91ab-2d7cd011db47. This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You might have sent your authentication request to the wrong tenant.\r\nTrace ID: d3cb36a5-b274-49f6-b82f-5245685bdf00\r\nCorrelation ID: 052c9d2a-81ee-4f08-a570-ad42e73862fd\r\nTimestamp: 2020-11-24 08:13:53Z","error":"invalid_resource","error_uri":"https://login.microsoftonline.com/error?code=500011"}
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.microsoft.azure.kusto.data.AadAuthenticationHelper.acquireAadApplicationAccessToken(AadAuthenticationHelper.java:156)
... 91 more
Caused by: com.microsoft.aad.adal4j.AuthenticationException: {"error_description":"AADSTS500011: The resource principal named https://ingest-kusto.aria.microsoft.com was not found in the tenant named 72f988bf-86f1-41af-91ab-2d7cd011db47. This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You might have sent your authentication request to the wrong tenant.\r\nTrace ID: d3cb36a5-b274-49f6-b82f-5245685bdf00\r\nCorrelation ID: 052c9d2a-81ee-4f08-a570-ad42e73862fd\r\nTimestamp: 2020-11-24 08:13:53Z","error":"invalid_resource","error_uri":"https://login.microsoftonline.com/error?code=500011"}
at com.microsoft.aad.adal4j.AdalTokenRequest.executeOAuthRequestAndProcessResponse(AdalTokenRequest.java:129)
at com.microsoft.aad.adal4j.AuthenticationContext.acquireTokenCommon(AuthenticationContext.java:930)
at com.microsoft.aad.adal4j.AcquireTokenCallable.execute(AcquireTokenCallable.java:70)
at com.microsoft.aad.adal4j.AcquireTokenCallable.execute(AcquireTokenCallable.java:38)
at com.microsoft.aad.adal4j.AdalCallable.call(AdalCallable.java:47)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

Null Pointer exception while using pyspark to write to kusto

Using pyspark on databricks( version 5 with python version 3), to write a dataframe to a pre-existing kusto table. Getting a null pointer exception

dataset.write.
format("com.microsoft.kusto.spark.datasource").
option("kustoCluster",cluster).
option("kustoDatabase",database).
option("kustoTable", table).
option("kustoAADClientID",clientId).
option("kustoClientAADClientPassword",clientPassword).
option("kustoAADAuthorityID",authorityId).
save()
Error Logs

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 dataset.write. format("com.microsoft.kusto.spark.datasource"). option("kustoCluster",cluster). option("kustoDatabase",database). option("kustoTable", table). option("kustoAADClientID",clientId). option("kustoClientAADClientPassword",clientPassword). option("kustoAADAuthorityID",authorityId). save()

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
732 self.format(format)
733 if path is None:
--> 734 self._jwrite.save()
735 else:
736 self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling o269.save.
: java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.getTaskInputMetrics$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.beginUpdateFilesystemSQLMetrics$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:631)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.foreach(WholeStageCodegenExec.scala:629)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.foldLeft(WholeStageCodegenExec.scala:629)
at com.microsoft.kusto.spark.datasink.KustoWriter$.serializeRows(KustoWriter.scala:285)
at com.microsoft.kusto.spark.datasink.KustoWriter$.ingestRowsIntoKusto(KustoWriter.scala:158)
at com.microsoft.kusto.spark.datasink.KustoWriter$$anonfun$ingestToTemporaryTableByWorkers$1.apply(KustoWriter.scala:224)
at com.microsoft.kusto.spark.datasink.KustoWriter$$anonfun$ingestToTemporaryTableByWorkers$1.apply(KustoWriter.scala:224)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Are there more option to support overwrite while saving records into Kusto.

Is your feature request related to a problem? Please describe.
We are running Spark Job which reads batch of data from blob storage. We use SaveMode enum for writing data into Kusto.
This works for transaction/Log data. But in the case of master data loading, we would like to Wipe out existing data and refresh table with new set of master data. Currently we see only "SaveMode.Append" is supported. Is there any option of SaveMode.Overwite to replace with new data

val async = false;
var finalDF= spark.sql("SELECT * FROM workspaces")

val kustoQ = finalDF.write
.format("com.microsoft.kusto.spark.datasource")
.option(KustoSinkOptions.KUSTO_CLUSTER, kustoCluster)
.option(KustoSinkOptions.KUSTO_DATABASE, kustoDatabase)
.option(KustoSinkOptions.KUSTO_TABLE, kustoTargetTable)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_ID, kustoAADAppID)
.option(KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD, kustoSecretKey)
.option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
.option(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, async)
.mode(SaveMode.Append)
.save()

getClusterNameFromUrlIfNeeded fails on valid URL

Describe the bug
I am trying to connect to my PlayFab Data Explorer, which has a cluster URL, "https://insights.playfab.com"
Per the directions here
https://docs.microsoft.com/en-us/gaming/playfab/features/insights/connectivity/connecting-kusto-explorer-to-insights
the URL should not be modified.

To Reproduce
from pyspark.sql import SparkSession

pyKusto = SparkSession.builder.appName("kustoPySpark").getOrCreate()

kustoOptions = {
"kustoCluster":"https://insights.playfab.com",
"kustoDatabase" : "912A9",
"kustoTable" : "events.all",
"kustoAadAppId":"" ,
"kustoAadAppSecret":"",
"kustoAadAuthorityID":""
}

Read the data from the kusto table with default reading mode

kustoDf = pyKusto.read.
format("com.microsoft.kusto.spark.datasource").
option("kustoCluster", kustoOptions["kustoCluster"]).
option("kustoDatabase", kustoOptions["kustoDatabase"]).
option("kustoQuery", kustoOptions["kustoTable"]).
option("kustoAadAppId", kustoOptions["kustoAadAppId"]).
option("kustoAadAppSecret", kustoOptions["kustoAadAppSecret"]).
option("kustoAadAuthorityID", kustoOptions["kustoAadAuthorityID"]).
load()

Py4JJavaError: An error occurred while calling o521.load.
: java.lang.StringIndexOutOfBoundsException: String index out of range: -1
at java.lang.String.substring(String.java:1967)
at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.getClusterNameFromUrlIfNeeded(KustoDataSourceUtils.scala:270)
at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.parseSourceParameters(KustoDataSourceUtils.scala:107)
at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)

Expected behavior
A clear and concise description of what you expected to happen.

The string should not be processed how ever its being processed.

Screenshots
If applicable, add screenshots to help explain your problem.
image

Desktop (please complete the following information):

  • OS: Ubuntu, Spark on Databricks

ingestIfNotExists doesn't work as expected

Describe the bug
I am trying to ingest data from Delta table format to Kusto (streaming). I am using ingestByTags property to avoid duplication. If I re-run pipleline on the exactly same data second time, I will get duplicates in the target Kusto table despite on I am using ingestIfNotExists property.

To Reproduce
Here is my Python code example, but tested both with Python and Scala.

import json


source_df = (
  spark
  .readStream
  .format("delta").load(DATA_PATH)
  .coalesce(1)
)

kusto_options = {
  "cluster": KUSTO_CLUSTER,
  "database" : KUSTO_DATABASE,
  "table" : KUSTO_TABLE,
  "app_id": KUSTO_APP_ID,
  "app_secret": KUSTO_APP_SECRET,
  "authority_id": KUSTO_AUTHORITY_ID,
}

ingestion_props = {
  "ingestByTags": ["myTag"],
  "ingestIfNotExists": ["myTag"],
}

kusto_q = (
  source_df
  .writeStream
  .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
  .option("checkpointLocation", KUSTO_CHECKPOINTS_PATH)
  .option("kustoCluster", kusto_options["cluster"])
  .option("kustoDatabase", kusto_options["database"])
  .option("kustoTable", kusto_options["table"])
  .option("kustoAadAppId", kusto_options["app_id"])
  .option("kustoAadAppSecret", kusto_options["app_secret"])
  .option("kustoAadAuthorityID", kusto_options["authority_id"])
  .option("writeEnableAsync", False)
  .option("sparkIngestionPropertiesJson", json.dumps(ingestion_props))  
  .trigger(once=True)
)

kusto_q.start().awaitTermination()

We consider here that source data doesn't change at all. If I run exactly the same code twice, I will get duplicates. One important thing here: uploaded extents has proper tags: ingest-by:myTag.

Expected behavior
I expected that my data will be uploaded only once. The second attempt should be rejected because extent with tag myTag already exists.

Screenshots
Here is result of two sequential attempts to upload data:
kustosink

Desktop (please complete the following information):

  • OS: Linux 1029-141313-brunt80-10-200-2-5 4.15.0-1092-azure #102~16.04.1-Ubuntu SMP Tue Jul 14 20:28:23 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
  • Version: Apache Spark 2.4.5, Scala 2.11, Python 3.7.3
  • Package: com.microsoft.azure.kusto:kusto-spark_2.4_2.11:2.3.0
  • Platform: Azure Databricks

Error in creating application access token

I am trying to migrate data from lake to data explorer using spark job,but I am getting token issue.

val cluster = "xxx"
val database = "xxx"
val table = "xxx"
val authorityId = "xxx"

import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
import com.microsoft.kusto.spark.sql.extension.SparkExtension._
import com.microsoft.kusto.spark.datasink.KustoSinkOptions

val str="select * from xxx"
val df = spark.sqlContext.sql(str)
val conf = Map(
KustoSinkOptions.KUSTO_CLUSTER -> cluster,
KustoSinkOptions.KUSTO_TABLE -> table,
KustoSinkOptions.KUSTO_DATABASE -> database,
KustoSinkOptions.KUSTO_AAD_CLIENT_ID -> appId,
KustoSinkOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey,
KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID -> authorityId)
val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)

evaluate estimate_rows_count() plugin is unsupported

Describe the bug
evaluate estimate_rows_count() plugin is unsupported for many data explorer clusters.

To Reproduce
Steps to reproduce the behavior:

Expected behavior

Screenshots
image

Desktop (please complete the following information):

  • OS: Windows
  • Version 10

Additional context
Add any other context about the problem here.

ApplicationAccessToken Error / Missing libraries (?)

I am trying to ingest data into Azure Data Explorer through PySpark with PyCharm IDE. However, I am having a lot of problems related to missing libraries when running my code.

According to Azure Data Explorer connector's page, I need to install the connector's jar and the two dependencies jar kusto-ingest and kusto-data.

After download all these 3 jar's and importing them to PySpark, I can't proceed with my data ingestion, it keeps returning me missing library errors. The first one is the azure-storage lib, then I've installed and imported the jar, it asks for adal4j lib, I do the same and it asks oauth2 lib, then json lib, azure-client-authentication lib, javax mail lib, and so on.

I've installed more than 10 jars and I still can't run this ingestion. Am I doing something wrong?

My PySpark version is 2.4. You can see my code below:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Teste") \
    .config('spark.jars', 'kusto-spark_2.4_2.11-2.5.2.jar,kusto-data-2.4.1.jar,kusto-ingest-2.4.1.jar,azure-storage-8.3.0.jar,json-20180813.jar,adal4j-1.6.5.jar') \
    .getOrCreate()

# loading a test csv file
df = spark.read.csv('MOCK_DATA.csv', header=True, sep=',')

df.write.format("com.microsoft.kusto.spark.datasource")\
  .option("kustoCluster", "myclustername")\
  .option("kustoDatabase", "mydatabase")\
  .option("kustoTable", "mytable")\
  .option("kustoAadAppId", "myappid")\
  .option("kustoAadAppSecret", "mysecret")\
  .option("kustoAadAuthorityID", "myautorityid")\
  .mode("Append")\
  .save()

Running into issue with .option("kustoAADAuthorityID","72f988bf-86f1-41af-91ab-2d7cd011db47")

We've had an azure databricks notebook which has successfully ran for a while, but couple days ago, it failed at this point:

dfkusto_read_df=(spark.read.format("com.microsoft.kusto.spark.datasource")
.option("kustoCluster",<>)
.option("kustoDatabase",<>)
.option("kustoQuery", kustoquery)
.option("kustoAADClientID",<>)
.option("kustoClientAADClientPassword",dbutils.secrets.get(scope = <>, key = <>))
.option("kustoAADAuthorityID","72f988bf-86f1-41af-91ab-2d7cd011db47")
.option("clientRequestPropertiesJson", crp.toString())
.load())

The error I'm getting is this: com.microsoft.azure.kusto.data.exceptions.DataServiceException: Error in acquiring ApplicationAccessToken

I can work around this by:

  1. commenting out this line: .option("kustoAADAuthorityID","72f988bf-86f1-41af-91ab-2d7cd011db47")
  2. replacing "72f988bf-86f1-41af-91ab-2d7cd011db47" with "microsoft.com" in the same line
  3. replacing "kustoAADAuthorityID" with "KUSTO_AAD_AUTHORITY_ID" (the line would be .option("KUSTO_AAD_AUTHORITY_ID","72f988bf-86f1-41af-91ab-2d7cd011db47")

There have been no library upgrades for our cluster. Other scripts use the same authentication and DO NOT fail.
I know that recently there were changes:
image

We have this version installed on our cluster: com.microsoft.azure.kusto:spark-kusto-connector:1.0.2

Is there a way to explain these inconsistencies?

Abilty to specify blob storage account on SPARK ingestion

As I can specify my blob storage on reads, can I specify, my storage account on write to kusto as well. That way, I have control over my data and can use it for further purposes if necessary.

Ex: I can have a retention policy of 30 days in Kusto, while the data still can sit in the blob storage.

Reading using scale mode in multiple notebooks fails for versions pre-1.1.6

If you encounter this:
"container {container} in account {account} not found, we can't create it using anonymous credentials, and no credentials found"

Please upgrade to newer version (>= 1.1.6).

To solve without upgrade:
Either force "single" mode option (good for small queries) -
KUSTO_READ_MODE 'readMode' -> ForceSingleMode.
Or provide storage credentials by yourselves
https://github.com/Azure/azure-kusto-spark/blob/master/docs/KustoSource.md#transient-storage-parameters

Handling Transient Failures during ingestion.

Once in a while, we are seeing transient failure during ingestion. Is there be retry performed for transient issue.

Reason for failure : The client could not finish the operation within specified timeout.: : : - Transient

.show ingestion failures| where OperationId == "56672f28-11ee-42a7-a13a-eaa2d75295a3"

Another Issue: As per current design, Spark Kusto Sink copy batch of records into temporary blob storage in .CSV.GZ format. Temporary Table is created point to temporary blob location. Finally move extents to target table.

If streaming job is stopped, If there are any pending ingestion i.e Copying of temporary table to target table. Currently system stops copying data to Final Kusto Table. If spark checkpoint gets committed with batch of records read by streaming engine but didn't/fail to copy data into target table. There is chance of Data loss. How exactly this scenario is handled.

Read not working java.lang.NoClassDefFoundError: com/microsoft/azure/kusto/data/ConnectionStringBuilder

While reading using the beta-02 connector,
always getting this error :- java.lang.NoClassDefFoundError: com/microsoft/azure/kusto/data/ConnectionStringBuilder

Getting this error on both python, scala in databricks.
Python ->3 ; Scala 2.11 ; Databricks -> 5.0

from pyspark.sql import SparkSession
pyKusto = SparkSession.builder.appName("kustoPySpark").getOrCreate()

kustoDf = pyKusto.read.
format("com.microsoft.kusto.spark.datasource").
option("kustoCluster", kustoOptions["kustoCluster"]).
option("kustoDatabase", kustoOptions["kustoDatabase"]).
option("kustoQuery", query).
option("kustoAADClientID", kustoOptions["kustoAADClientID"]).
option("kustoAADAuthorityID", kustoOptions["kustoAADAuthorityID"]).
option("kustoClientAADClientPassword",
kustoOptions["kustoClientAADClientPassword"]).
option("readMode", "lean").
load()

Error encountered while inserting Spark dataframe from Azure Databricks into Azure Data Explorer table

I have tried to insert data from Azure Databricks instance - Spark dataframe written in scala program into Azure Data Explorer table. Then, encountered below error:

java.lang.NoSuchMethodError: com.microsoft.azure.kusto.data.Client.execute(Ljava/lang/String;)Lcom/microsoft/azure/kusto/data/Results;
at com.microsoft.kusto.spark.datasink.KustoWriter$$anonfun$cleanupTempTables$2.apply(KustoWriter.scala:111)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.NoSuchMethodError: com.microsoft.azure.kusto.data.Client.execute(Ljava/lang/String;)Lcom/microsoft/azure/kusto/data/Results;
at com.microsoft.kusto.spark.utils.KustoClient.createTmpTableWithSameSchema(KustoClient.scala:46)
at com.microsoft.kusto.spark.datasink.KustoWriter$.write(KustoWriter.scala:64)
at com.microsoft.kusto.spark.datasource.DefaultSource.createRelation(DefaultSource.scala:36)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:147)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:188)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:135)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:118)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:242)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:172)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-1915893717692222:36)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$$iw$$iw$$iw$$iw$$iw.(command-1915893717692222:94)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$$iw$$iw$$iw$$iw.(command-1915893717692222:96)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$$iw$$iw$$iw.(command-1915893717692222:98)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$$iw$$iw.(command-1915893717692222:100)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$$iw.(command-1915893717692222:102)
at line9d9aeb3605f8407f9e652d740c75963a25.$read.(command-1915893717692222:104)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$.(command-1915893717692222:108)
at line9d9aeb3605f8407f9e652d740c75963a25.$read$.(command-1915893717692222)
at line9d9aeb3605f8407f9e652d740c75963a25.$eval$.$print$lzycompute(:7)
at line9d9aeb3605f8407f9e652d740c75963a25.$eval$.$print(:6)
at line9d9aeb3605f8407f9e652d740c75963a25.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
at java.lang.Thread.run(Thread.java:748)

Error in acquiring ApplicationAccessToken

Describe the bug
I need to read a Log Analytics workspace located in a subscription outside of microsoft.com

at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.microsoft.aad.adal4j.AuthenticationException: {"error_description":"AADSTS700016: Application with identifier 'xxxxxxx-0d79-41f2-b362-5c6408f7854d' was not found in the directory 'microsoft.com'. This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You may have sent your authentication request to the wrong tenant.\r\nTrace ID: 66dbbf3d-2885-47fa-910a-5e591c916100\r\nCorrelation ID: af9f1363-64e4-4cb2-8d31-fd91fff3ab62\r\nTimestamp: 2021-03-12 19:24:22Z","error":"unauthorized_client","error_uri":"https://login.microsoftonline.com/error?code=700016"}

To Reproduce
Steps to reproduce the behavior:

val conf: Map[String, String] = Map(
KustoSourceOptions.KEY_VAULT_URI -> keyVaultUri,
KustoSourceOptions.KEY_VAULT_APP_ID -> keyVaultAppId,
KustoSourceOptions.KEY_VAULT_APP_KEY -> keyVaultAppKey
)

val query = table
val dfResult = spark.read.kusto(cluster, database, query, conf)

Expected behavior
I supposed I can pass in authorityId but it is only available in SINK:

I need this in READ
.option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, "AAD Authority Id") // "microsoft.com"

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • Databricks
  • Runtime 7.3

Additional context
Add any other context about the problem here.

Count check on reading

Should not estimate query size using count - instead use the right kusto function for this.

Alter the table schema if it exists

Schema of my table might change, and I want to handle the schema changes while we ingest data from SPARK. I need an ability to Alter the table schema if it exists i.e. create-merge if a table exists.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.