Git Product home page Git Product logo

pulsar-io-cloud-storage's Introduction

Cloud Storage Sink Connector for Pulsar

The Cloud Storage sink connector supports exporting data from Pulsar topics to cloud storage (such as AWS S3 and Google GCS) either in Avro, JSON, Parquet or other formats. According to your environment, the Cloud Storage sink connector can guarantee exactly-once support for exporting data to cloud storage.

Installation

There are two ways to install the Cloud Storage sink connector.

  • Install the the Cloud Storage sink connector through the NAR file.
  • Install the the Cloud Storage sink connector from the source code.

To build the the Cloud Storage sink connector from the source code, follow these steps.

  1. Clone the project from GitHub to your local.

    git clone https://github.com/streamnative/pulsar-io-cloud-storage.git
    cd pulsar-io-cloud-storage
  2. Build the project.

    mvn clean install -DskipTests

    You can find the NAR file in the following directory.

    ./pulsar-io-cloud-storage/target/pulsar-io-cloud-storage-${version}.nar

Configuration

The Cloud Storage sink connector supports the following properties.

Cloud Storage sink connector configuration

Storage Provider: AWS S3

Name Type Required Default Description
provider String True null The Cloud Storage type, such as aws-s3,s3v2(s3v2 uses the AWS client but not the JCloud client).
accessKeyId String True null The Cloud Storage access key ID. It requires permission to write objects.
secretAccessKey String True null The Cloud Storage secret access key.
role String False null The Cloud Storage role.
roleSessionName String False null The Cloud Storage role session name.
endpoint String True null The Cloud Storage endpoint.
bucket String True null The Cloud Storage bucket.
formatType String False "json" The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON.
partitionerType String False "partition" The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions.
timePartitionPattern String False "yyyy-MM-dd" The format pattern of the time-based partitioning. For details, refer to the Java date and time format.
timePartitionDuration String False "86400000" The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d.
partitionerUseIndexAsOffset Boolean False false Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See PIP-70 for more details.
batchSize int False 10 The number of records submitted in batch.
batchTimeMs long False 1000 The interval for batch submission.
maxBatchBytes long False 10000000 The maximum number of bytes in a batch.
sliceTopicPartitionPath Boolean False false When it is set to true, split the partitioned topic name into separate folders in the bucket path.
withMetadata Boolean False false Save message attributes to metadata.
useHumanReadableMessageId Boolean False false Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string.
withTopicPartitionNumber Boolean False true When it is set to true, include the topic partition number to the object path.
bytesFormatTypeSeparator String False "0x10" It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object.
pendingQueueSize int False 10 The number of records buffered in queue. By default, it is equal tobatchSize. You can set it manually.
useHumanReadableSchemaVersion Boolean False false Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format.
skipFailedMessages Boolean False false Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message.
pathPrefix String False false If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/.
avroCodec String False snappy Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy.
parquetCodec String False gzip Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd.
jsonAllowNaN Boolean False false Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default.

Storage Provider: Google Cloud Storage

Name Type Required Default Description
provider String True null The Cloud Storage type. Google cloud storage only supports the google-cloud-storage provider.
gcsServiceAccountKeyFilePath String False "" Path to the GCS credentials file. If empty, the credentials file are read from the GOOGLE_APPLICATION_CREDENTIALS environment variable.
gcsServiceAccountKeyFileContent String False "" The contents of the JSON service key file. If empty, credentials are read from gcsServiceAccountKeyFilePath file.
bucket String True null The Cloud Storage bucket.
formatType String False "json" The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON.
partitionerType String False "partition" The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions.
timePartitionPattern String False "yyyy-MM-dd" The format pattern of the time-based partitioning. For details, refer to the Java date and time format.
timePartitionDuration String False "86400000" The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d.
partitionerUseIndexAsOffset Boolean False false Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See PIP-70 for more details.
batchSize int False 10 The number of records submitted in batch.
batchTimeMs long False 1000 The interval for batch submission.
maxBatchBytes long False 10000000 The maximum number of bytes in a batch.
sliceTopicPartitionPath Boolean False false When it is set to true, split the partitioned topic name into separate folders in the bucket path.
withMetadata Boolean False false Save message attributes to metadata.
useHumanReadableMessageId Boolean False false Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string.
withTopicPartitionNumber Boolean False true When it is set to true, include the topic partition number to the object path.
bytesFormatTypeSeparator String False "0x10" It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object.
pendingQueueSize int False 10 The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually.
useHumanReadableSchemaVersion Boolean False false Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format.
skipFailedMessages Boolean False false Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message.
pathPrefix String False false If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/.
avroCodec String False snappy Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy.
parquetCodec String False gzip Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd.
jsonAllowNaN Boolean False false Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default.

Storage Provider: Azure Blob Storage

Name Type Required Default Description
provider String True null The Cloud Storage type. Azure Blob Storage only supports the azure-blob-storage provider.
azureStorageAccountSASToken String True "" The Azure Blob Storage account SAS token. Required when authenticating via SAS token.
azureStorageAccountName String True "" The Azure Blob Storage account name. Required when authenticating via account name and account key.
azureStorageAccountKey String True "" The Azure Blob Storage account key. Required when authenticating via account name and account key.
azureStorageAccountConnectionString String True "" The Azure Blob Storage connection string. Required when authenticating via connection string.
endpoint String True null The Azure Blob Storage endpoint.
bucket String True null The Cloud Storage bucket.
formatType String False "json" The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON.
partitionerType String False "partition" The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions.
timePartitionPattern String False "yyyy-MM-dd" The format pattern of the time-based partitioning. For details, refer to the Java date and time format.
timePartitionDuration String False "86400000" The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d.
partitionerUseIndexAsOffset Boolean False false Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See PIP-70 for more details.
batchSize int False 10 The number of records submitted in batch.
batchTimeMs long False 1000 The interval for batch submission.
maxBatchBytes long False 10000000 The maximum number of bytes in a batch.
sliceTopicPartitionPath Boolean False false When it is set to true, split the partitioned topic name into separate folders in the bucket path.
withMetadata Boolean False false Save message attributes to metadata.
useHumanReadableMessageId Boolean False false Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string.
withTopicPartitionNumber Boolean False true When it is set to true, include the topic partition number to the object path.
bytesFormatTypeSeparator String False "0x10" It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object.
pendingQueueSize int False 10 The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually.
useHumanReadableSchemaVersion Boolean False false Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format.
skipFailedMessages Boolean False false Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message.
pathPrefix String False false If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/.
avroCodec String False snappy Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy.
parquetCodec String False gzip Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd.
jsonAllowNaN Boolean False false Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default.

Configure Cloud Storage sink connector

Before using the Cloud Storage sink connector, you need to create a configuration file through one of the following methods.

  • JSON

    {
       "tenant": "public",
       "namespace": "default",
       "name": "cloud-storage-sink",
       "inputs": [
          "user-avro-topic"
       ],
       "archive": "connectors/pulsar-io-cloud-storage-0.0.1.nar",
       "parallelism": 1,
       "configs": {
          "provider": "aws-s3",
          "accessKeyId": "accessKeyId",
          "secretAccessKey": "secretAccessKey",
          "role": "none",
          "roleSessionName": "none",
          "bucket": "testBucket",
          "region": "local",
          "endpoint": "us-standard",
          "formatType": "parquet",
          "partitionerType": "time",
          "timePartitionPattern": "yyyy-MM-dd",
          "timePartitionDuration": "1d",
          "batchSize": 10,
          "batchTimeMs": 1000
       }
    }
  • YAML

    tenant: "public"
    namespace: "default"
    name: "Cloud Storage-sink"
    inputs: 
      - "user-avro-topic"
    archive: "connectors/pulsar-io-cloud-storage-0.0.1.nar"
    parallelism: 1
    
    configs:
      provider: "aws-s3",
      accessKeyId: "accessKeyId"
      secretAccessKey: "secretAccessKey"
      role: "none"
      roleSessionName: "none"
      bucket: "testBucket"
      region: "local"
      endpoint: "us-standard"
      formatType: "parquet"
      partitionerType: "time"
      timePartitionPattern: "yyyy-MM-dd"
      timePartitionDuration: "1d"
      batchSize: 10
      batchTimeMs: 1000

Data format types

Cloud Storage Sink Connector provides multiple output format options, including JSON, Avro, Bytes, or Parquet. The default format is JSON. With current implementation, there are some limitations for different formats:

This table lists the Pulsar Schema types supported by the writers.

Pulsar Schema Writer: Avro Writer: JSON Writer: Parquet Writer: Bytes
Primitive *
Avro
Json
Protobuf **
ProtobufNative ***

*: The JSON writer will try to convert the data with a String or Bytes schema to JSON-format data if convertable.

**: The Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it may not provide the best effort conversion.

***: The ProtobufNative record holds the Protobuf descriptor and the message. When writing to Avro format, the connector uses avro-protobuf to do the conversion.

This table lists the support of withMetadata configurations for different writer formats:

Writer Format withMetadata
Avro
JSON
Parquet *
Bytes

*: When using Parquet with PROTOBUF_NATIVE format, the connector will write the messages with DynamicMessage format. When withMetadata is set to true, the connector will add __message_metadata__ to the messages with PulsarIOCSCProtobufMessageMetadata format.

For example, if a message User has the following schema:

syntax = "proto3";
message User {
 string name = 1;
 int32 age = 2;
}

When withMetadata is set to true, the connector will write the message DynamicMessage with the following schema:

syntax = "proto3";
message PulsarIOCSCProtobufMessageMetadata {
 map<string, string> properties = 1;
 string schema_version = 2;
 string message_id = 3;
}
message User {
 string name = 1;
 int32 age = 2;
 PulsarIOCSCProtobufMessageMetadata __message_metadata__ = 3;
}

By default, when the connector receives a message with a non-supported schema type, the connector will fail the message. If you want to skip the non-supported messages, you can set skipFailedMessages to true.

Dead-letter topics

To use a dead-letter topic, you need to set skipFailedMessages to false, and set --max-redeliver-count and --dead-letter-topic when submit the connector with the pulsar-admin CLI tool. For more info about dead-letter topics, see the Pulsar documentation. If a message fails to be sent to the Cloud Storage and there is a dead-letter topic, the connector will send the message to the dead-letter topic.

Usage

  1. Prepare the AWS Cloud Storage service. In this example, we use Cloud Storagemock as an example.

    docker pull apachepulsar/s3mock:latest
    docker run -p 9090:9090 -e initialBuckets=pulsar-integtest apachepulsar/s3mock:latest
    
  2. Put the pulsar-io-cloud-storage-2.5.1.nar in the Pulsar connector catalog.

    cp pulsar-io-cloud-storage-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-cloud-storage-2.5.1.nar
    
  3. Start Pulsar in the standalone mode.

    $PULSAR_HOME/bin/pulsar standalone
    
  4. Run the Cloud Storage sink connector locally.

    $PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file cloud-storage-sink-config.yaml
    
  5. Send Pulsar messages. Currently, Avro or JSON mode supports formatType json, avro, parquet. No schema mode can only use bytes formatType.

      try (
                 PulsarClient pulsarClient = PulsarClient.builder()
                         .serviceUrl("pulsar://localhost:6650")
                         .build();
                 Producer<TestRecord> producer = pulsarClient.newProducer(Schema.AVRO(TestRecord.class))
                         .topic("public/default/test-parquet-avro")
                         .create();
                 ) {
                 List<TestRecord> testRecords = Arrays.asList(
                         new TestRecord("key1", 1, null),
                         new TestRecord("key2", 1, new TestRecord.TestSubRecord("aaa"))
                 );
                 for (TestRecord record : testRecords) {
                     producer.send(record);
                 }
             }
  6. Validate Cloud Storage data.

    To get the path, you can use the jclould to verify the file, as shown below.

    Properties overrides = new Properties();
    overrides.put(“jclouds.s3.virtual-host-buckets”, “false”);
    BlobStoreContext blobStoreContext = ContextBuilder.newBuilder(“aws-s3”)
            .credentials(
                    “accessKeyId”,
                    “secretAccessKey”
            )
            .endpoint(“http://localhost:9090”) // replace to s3mock url
            .overrides(overrides)
            .buildView(BlobStoreContext.class);
    BlobStore blobStore = blobStoreContext.getBlobStore();
    final long sequenceId = FunctionCommon.getSequenceId(message.getMessageId());
    final String path = “public/default/test-parquet-avro” + File.separator + “2020-09-14" + File.separator + sequenceId + “.parquet”;
    final boolean blobExists = blobStore.blobExists(“testBucket”, path);
    Assert.assertTrue(“the sink record does not exist”, blobExists);

    You can find the data in your testBucket bucket. The path is something like public/default/test-parquet-avro/2020-09-14/1234.parquet. The path consists of three parts, the basic part of the topic, partition information, and format suffix.

    • Basic part of topic: public/default/test-parquet-avro/ This part consists of the name of the tenant, namespace, and the input topic.
    • Partition information: 2020-09-14/${messageSequenceId} The date is generated based on the partitionerType parameter in the configuration. And the ${messageSequenceId} is generated by FunctionCommon.getSequenceId(message.getMessageId()).
    • Format suffix: .parquet This part is generated based on the formatType parameter in the configuration.

Permissions

AWS S3 permission policies

The suggested permission policies for AWS S3 are:

  • s3:AbortMultipartUpload
  • s3:GetObject*
  • s3:PutObject*
  • s3:List*

If you do not want to provide region in the configuration, you should enable s3:GetBucketLocation permission policy as well.

Troubleshooting

Sink flushing only after batchTimeMs elapses

There is a scenario where the sink is only flushing whenever the batchTimeMs has elapsed, even though there are many messages waiting to be processed. The reason for this is that the sink will only acknowledge messages after they are flushed to cloud storage but the broker stops sending messages when it reaches a certain limit of unacknowledged messages. If this limit is lower or close to batchSize, the sink never receives enough messages to trigger a flush based on the amount of messages. In this case please ensure the maxUnackedMessagesPerConsumer set in the broker configuration is sufficiently larger than the batchSize setting of the sink.

pulsar-io-cloud-storage's People

Contributors

addisonj avatar codelipenghui avatar danpi avatar dependabot[bot] avatar freeznet avatar huanli-meng avatar jianyun8023 avatar liangyepianzhou avatar nicoloboschi avatar shibd avatar sijie avatar streamnativebot avatar tuteng avatar yaalsn avatar zzzming avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-io-cloud-storage's Issues

[BUG] Java heap space error when trying to run pulsar sink

Describe the bug
https://streamnative.io/en/blog/tech/2020-10-20-cloud-storage-sink-connector-251 When running part 2 in step 2, we encountered java heap space error

To Reproduce
Steps to reproduce the behavior:

  1. copy pulsar-io-cloud-storage-2.5.1.nar to apache-pulsar-2.7.0/connectors
  2. restart broker
  3. upload cloud-storage-sink-config.yaml to apache-pulsar-2.7.0 folder
  4. bin/pulsar-admin sink localrun --sink-config-file cloud-storage-sink-config.yaml
  5. see java heap space error: [function-timer-thread-6-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - demotenant/demonamespace/Cloud-Storage-sink-java.lang.OutOfMemoryError: Java heap space Function Container is dead with exception.. restarting

Expected behavior
Should not see error

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Our brokers are deployed in docker containers, so step 4 was run within the docker container.

Can anyone help with this? Thanks

Update master to JDK17

To keep in sync with Pulsar master, we should upgrade the connector to use JDK 17 too.

GCS Sink Connector not able to write as parquet when published data using golang client for pulsar.. works fine with java client

@freeznet when I am writing data using pulsar golang client .. the below is the program I am using to write on pulsar topic and its able to publish on pulsar and I am also able to consume it .. but the stream native gcs sink connector is not able to write it as parquet on gcs and I am getting the below exception..

Slack Thread on Apache Pulsar Community : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1640579795494300

Exception Stack Trace : `16:28:54.381 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/sadim-test-json3][/pulsar-poc/cloud-storage-sink] Subscribed to topic on pulsar-poc-broker-1.pulsar-poc-broker.pulsar-poc.svc.cluster.local/10.248.1.83:6650 -- consumer: 43
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing [SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persistent://public/default/sadim-test-json3], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@23bbe054], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@2dd916ed,schemaType=JSON}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$322/0x0000000840688840@14f36ab9, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$321/0x0000000840688440@495fc424), value=[B@50d82707)] buffered records to blob store
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:
org.apache.avro.SchemaParseException: Cannot parse schema
at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?]
at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
16:28:55.788 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Skip flushing, because the pending flush queue is empty ...

accuknox-mysql-helm-temp/pulsar-test-dec22-test1/public/default/CiliumTelemetryDemo27Dec/2021-12-27`

Golang Client Code : `package main
import (
"context"
"time"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"encoding/json"
"strconv"
)

func main() {
fmt.Println("hello world")
for i := 0; i < 500; i++ {
fmt.Println("sadim"+ strconv.Itoa(i))
writePulsar("ciliumjson","sadim111")
fmt.Println("sadimend"+ strconv.Itoa(i))
}
fmt.Println("hello world 2")
}

type AccuknoxJsonWrapperObject struct {
msg AccuknoxFinalJsonWrapperObject msg:"json"
}

type AccuknoxFinalJsonWrapperObject struct {
inputjson string
}

var (
exampleSchemaDef = "{"type":"record","name":"Example2","namespace":"test2","fields":[{"name":"inputjson","type":"string"}]}"
)

func PublishToPulsarTopic(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
fmt.Println("Could not instantiate Pulsar client: %v")
}

defer client.Close()

	JsonWrapperObject2 := AccuknoxFinalJsonWrapperObject{inputjson}
	fmt.Println("sadim1")
	fmt.Println(JsonWrapperObject2)
	JsonWrapperObject := AccuknoxJsonWrapperObject{JsonWrapperObject2}
	fmt.Println("sadim2")
	fmt.Println(JsonWrapperObject)
	
	file, _ := json.Marshal(JsonWrapperObject2)
	fmt.Println("sadim3")
	fmt.Println(file)
	file2, _ := json.Marshal(JsonWrapperObject)
	fmt.Println("sadim4")
	fmt.Println(file2)

properties := make(map[string]string)
properties["pulsar"] = "hello"
fmt.Println("sadim5")
fmt.Println(properties)

jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})

if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: (JsonWrapperObject2),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

return ""

}
type testJSON struct {
InputJson string
}
var (
exampleSchemaDef5 = "{"type":"record","name":"testJSON","namespace":"test2"," +
""fields":[{"name":"InputJson","type":"testJSON"}]}"
)

var (
exSchema = "{"type":"record","schema":"","properties":{"InputJson":"testJSON"}}"
)
func writePulsar(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
fmt.Println(err)
}
defer client.Close()

//a2 := testJSON{InputJson: inputjson}
//properties := map[string]testJSON{"InputJson" : a2}
properties2 := make(map[string]string)
properties2["InputJson"] = "testJSON"
jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef5, properties2)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})
if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &testJSON{
InputJson: inputjson,
},
})
if err != nil {
fmt.Println(err)
}
producer.Close()
return ""
}`

Java Working Client which is able to write as parquet on pulsar code: `package accuknox.datalake;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.JSONSchema;

public class App3 {
public static void main(String[] args) throws InterruptedException, IOException {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
/*
* Producer producer = client.newProducer(JSONSchema.of(String.class))
* .topic("persistent://public/default/cilium-telemetry-4") .create();
*/
//Schema.JSON
Producer producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("CiliumTelemetryDemo27Dec").create();
//Producer<byte[]> producer = client.newProducer().topic("cilium-test1").create();
FileReader fr = null;
BufferedReader br = null;
AccuknoxJsonObject jsonObject = null;
for (int i=0;i<=90;i++) {
try
{
File file = new File("./knoxfeedermockdata/flow.txt");
//File file = new File("./knoxfeedermockdata/cilium_alerts_tableMockData.txt");
//File file = new File("./knoxfeedermockdata/cilium_v3_All.txt");
System.out.println(file.getAbsolutePath());
fr=new FileReader(file);
br=new BufferedReader(fr);
String line;
while((line=br.readLine())!=null)
{
System.out.println(line);
//for(int i = 0 ; i< 1000000; i++)
jsonObject = new AccuknoxJsonObject(line);
producer.send(jsonObject);
}
System.gc();
//this.getRuntime().gc()
fr.close(); //closes the stream and release the resources
}
catch(IOException e)
{
System.err.print(e);
} catch(NullPointerException e2) {
System.err.print(e2);
} finally {
try {
if(br != null)
br.close();
if(fr != null)
fr.close();
} catch(NullPointerException ex) {
System.err.print(ex);
}
}
//Thread.sleep(200);
}
producer.close();
client.close();
}
}
`

cc: @sijie

[BUG] Sink flushes only when batchTimeMs is exceeded for large batchSize

Describe the bug
BlobStoreAbstractSink is written incorrectly and contains a bug.
Every time the framework calls write() (records are from the consumer), it does a blocking add to a queue called pendingFlushQueue which has max capacity of 100,000 as the users configured (the sink - pendingQueueSize).
After the write was successful, it runs flushIfNeeded checking if we exceeded max bytes (configured to default of 10kb) needed to flush or max records number (batchSize - configured to 10,000 records).
So after pod start, queue is 0, and quickly fills up. Once it hits 10kb, it async launches a flush and continues to write to the queue until the queue is full (100,000 records).
The flush takes up-to 10kb or 10,000 records, so 10kb and writes it to S3.
Then finishes.
Since the write() is blocked since queue is full, nothing triggers another flush.
So only after maxBatchTime currently configured to 5min, it runs a flush, takes 10kb, flush, and repeat - 5min.

Expected behavior
The queue should flush the pending records repeatedly, independent of the insertion by the write method

[BUG] Verify the connector can interact with gcs and azure blob store correctly

Describe the bug
Currently the connector lack verification of running with

  • gcs
  • azure blob store

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] extract schema info from messages without schema version

Describe the bug
If messages without schema version, it will treat as bytes schema, but it can be recovered from the schema info.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] NPE if partitionerType is not set

Describe the bug
If you don't set partitionerType in the config, a NPE is thrown.

java.lang.NullPointerException: null
    at org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig.validate(BlobStoreAbstractConfig.java:136) 
    at org.apache.pulsar.io.jcloud.sink.CloudStorageSinkConfig.validate(CloudStorageSinkConfig.java:141) 
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.open(BlobStoreAbstractSink.java:99) 
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:925) ~[?:?]

To Reproduce
See above

Expected behavior
Clear error message to let the user correct the configuration

[FEATURE] Add a new batch triggering mechanism that is based on the batch size

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
Whenever the records inside a batch has a total size of a certain value (e.g., 64MB), trigger a new batch to send out data

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Pulsar GCS Sink Connector not able to write as parquet when pulsar function sends data to output topic in some user defined pojo format

for more details please refer the slack thread : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1643259234073739

@freeznet @sijie

Pulsar Function code:=
`package accuknox.pulsar.functions;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

public class PulsarFunctionTest3 implements Function<String, String> {
// This function is invoked every time a message is published to the input topic
@OverRide
public String process(String input, Context context) throws Exception {
final String a;
Arrays.asList(input.split(",")).forEach(word -> {
String counterKey = word.toLowerCase();
context.incrCounter(counterKey, 1);
});
return String.format("%s!", input).concat("tomwoo");
}
}`

Above pulsar function when loading data on output topic .. gcs sink connector is unable to load data as parquet ..

The error stack trace is :=

-dec22-test1/public/default/sadimtestoutput999/2022-01-27/04/51002737768.parquet java.lang.RuntimeException: Null-value for required field: HostPID at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:200) ~[parquet-avro-1.12.0.jar:1.12.0] at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:171) ~[parquet-avro-1.12.0.jar:1.12.0] at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:138) ~[parquet-hadoop-1.12.0.jar:1.12.0] at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:310) ~[parquet-hadoop-1.12.0.jar:1.12.0] at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriter(ParquetFormat.java:92) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:279) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:244) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?]

Also .. the sink connector version is :=
configMapRef:
name: cluster-function-mesh-config
image: docker.io/streamnative/pulsar-io-cloud-storage:2.8.1.30
imagePullPolicy: IfNotPresent
name: pulsar-sink

FYR :
even when producer uses byte array then also throws the below error:-
Producer<byte[]> producer = client.newProducer().topic("sadiuyuiu").create();

0c4)] buffered records to blob store 03:50:20.142 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception: org.apache.avro.SchemaParseException: Cannot parse <null> schema at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?] at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?] at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?] at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?]

also ..
if the producer object is create as below.. then the gcs sink connector is not able to write as parquet on gcs and throws error that schema is null:-
producer = client.newProducer(Schema.STRING).topic(topicName).create();

the issue is gcs sink connector is able to write as parquet when the publisher/producer class object on input topic has beein intialized as below:-
Producer producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("sadim009").create();

link shared by @freeznet for other reference issue : apache/pulsar#12886

Note: please check slack thread https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1643259234073739 and https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1643176282015089 for complete java classes

topic name as a group of topic partitions in s3 path

We need to have the topic name on the s3 path , and not only the partitions:

public/default/s3-test/s3-test-partition-0

Instead of

public/default/s3-test-partition-0

Not sure if this is something new or is already possible. Anyway should be an small change in the path.

[BUG] batching does not batch

Describe the bug
While configuring s3 parquet sink with batching, I find out that the sinks creates a separate file for each record.

To Reproduce
Steps to reproduce the behavior:

  1. create a parquet/avro s3 sink with batching
  2. send 2 messages
  3. see that there are 2 parquet files on s3

Expected behavior
1 file parquet file containing 2 records

Proposed solution
from the code it seems that the batching is done wrong. we should batch 'plain' GenericRecords on the 'write' function, and later on the 'flush' function we should group records with by schema and partition path, change the 'Format.recordWriter' to take a list of records and only then send it to the blob store.

Side note:
In general a lot of sinks need some kind of windowing, is there any way to configure the window properties of pulsar function to work with pulsar sink? I think it could be useful to have another api for the sink that would get a collection of records rather than 1 record at a time. this way we do not need to reimplement batching for each sink

EDIT: is there a way to open PR with a fix?

@codelipenghui @jianyun8023

[BUG] Time Partitioner not working as expected for GCS

Data Sent to GCS Bucket is not in the correct date folder.
I have configured timePartitionDuration: 1d It was working fine but after a few days dates folder was not being created (say after 2022-06-23 folders had not been created) and the data was being sent to the 2022-06-23 folder.
Data being sent to GCS.

To Reproduce
I am using
Function Mesh version: 0.1.11-rc3
Pulsar IO Cloud storage sink connector Version: 2.9.2.21

Expected behavior
Data Sent to GCS Bucket should be in the respective data folder.

Screenshots
Screenshot 2022-06-28 204511

Additional context
Add any other context about the problem here.

[FEATURE] Pending Queuesize default to the maxbatch size

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
Currently the pendingQueuesize is default to 10X of maxbatch size, we should reduce the ration to 1.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] Connector does not create output files for each topic when reading multiple topics

The connector creates only one output file shared by the records of all topics, instead of creating independent output files for each topic.

To Reproduce
Steps to reproduce the behavior:

  1. Consume from 3 topics
  2. Check output file(s)
  3. Single output file was created, containing records of all 3 topics

Expected behavior
Should create 3 output files, each containing records of only one topic

Additional context
The problem is that the sink only looks at the first record of each batch to calculate the file path. This is fine for single-topic consumption but not in case it reads from multiple topics.

https://github.com/streamnative/pulsar-io-cloud-storage/blob/master/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java#L242

[BUG] Impossible to select the partition type

Between the two partitionerType: partition (default) and time, it is not possible to use partition.

This is due to the fact that is a mismatch in the code. The partitionerType is defined with the types: default and time. With the condition below, the property has to contain one of the two defined values.

Then the types partition is used instead of default, therefore we can't use it.

Two possibilities to revolve the mismatch:

  • Rename default by partition in the Map, to be aligned with the documentation.
  • Rename partition by default in the switch and update the documentation.

[BUG] error while writing data in Parquet format using JsonSchema as schema format

Describe the bug
I was using the Cloud Storage Sink to collect data from Pulsar and write it to AWS S3 in Parquet. Messages were produced using a JsonSchema format. The Sink fails as soon as it tries to convert the collected data into org.apache.avro.generic.GenericRecord (within the convertGenericRecord function).

It tried to produce messages both from Python and from Java and both fail but with different stack traces.

Note: if the formatType specified in the configuration is json everything works fine.

To Reproduce
Use this template configuration for the pulsar-io-cloud-storage v2.9.3.6:

tenant: "<theTenant>"
namespace: "schema-registry"
name: "cloud-storage-sink"
inputs: 
  - "persistent://<theTenant>/<theNamespace>/<theTopic>"
  - <otherTopicUrl>
archive: "connectors/pulsar-io-cloud-storage-2.9.3.6.nar"
parallelism: 1

configs:
  provider: "aws-s3"
  accessKeyId: "<yourAccessKeyId>"
  secretAccessKey: "<yourSecretAccessKey>"
  bucket: "<yourS3Bucket>"
  region: "<yourRegion>"
  pathPrefix: "cloud_storage_sink_parquet/"
  formatType: "parquet"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 100
  batchTimeMs: 600000
  withMetadata: false
  withTopicPartitionNumber: false

And produce messages in JsonSchema format. Here the code for a minimal Python producer:

import pulsar
from pulsar.schema import *

class YourMessageClass(Record):
    ...

def generate_message() -> YourMessageClass:
    ...

if __name__ == '__main__':
    client = pulsar.Client('pulsar://host.docker.internal:6650')

    producer = pulsar.Client.create_producer(
        topic='persistent://<theTenant>/<theNamespace>/<theTopic>',
        producer_name='python_producer',
        schema=pulsar.schema.JsonSchema(YourMessageClass)
    )

    for i in range(100):
        msg = generate_message()
        producer.send(msg)

    client.close()

Expected behavior
A chunk of data containing a list of collected messages, written to the specified AWS S3 prefix in Parquet format.

Screenshots
None

Additional context
The tests were done on my laptop, using an Apache Pulsar Docker container where the schema-registry was properly configured (the schema definition of the messages have been uploaded) and the version pulsar-io-cloud-storage-2.9.3.6.nar was loaded.

This is the error occurred while writing data produced with the Python producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-02/180925500946.parquet
java.lang.NullPointerException: null
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:220) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

This is the error occurred while writing data produced with the Java producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-08/64156074046.parquet
java.util.NoSuchElementException: No value present
	at java.util.Optional.get(Optional.java:148) ~[?:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:207) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

Partition for cloud io connector

I am currently working with the Pulsar Cloud Store Connector (S3 connector)
I am using the latest version of Pulsar Cloud Storage Connector https://github.com/streamnative/pulsar-io-cloud-storage/releases/tag/v2.5.1.
Between the two partitionerType, partition (default) and time , it is not possible to use partition .
Because in the code, a test is done on the parameter value: https://github.com/streamnative/pulsar-io-cloud-storage/blob/3c734a592598f04a012cf[…]n/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java
And the value for the test is default instead of partition : https://github.com/streamnative/pulsar-io-cloud-storage/blob/3c734a592598f04a012cf[…]n/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java

[BUG] failed to extract schema version for AVRO format and causing cast error

Describe the bug

When useHumanReadableSchemaVersion=false

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: null in field schemaVersion in field __message_metadata__
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:317) ~[java-instance.jar:?]
        at org.apache.pulsar.io.jcloud.format.AvroFormat.recordWriterBuf(AvroFormat.java:105) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:264) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:234) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassCastException: null in field schemaVersion in field __message_metadata__
        at org.apache.avro.generic.GenericDatumWriter.addClassCastMsg(GenericDatumWriter.java:191) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:229) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) ~[java-instance.jar:?]
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:314) ~[java-instance.jar:?]
        ... 10 more
Caused by: java.lang.ClassCastException

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] no docs about `pathPrefix`

Describe the bug
The docs do not have pathPrefix description.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] `maxTimeoutMs` should apply at the topic level

Describe the bug
Similar to #493, currently the maxTimeoutMs is applied for all topics at once, instead of having an independent timer for each topic.

Expected behavior
Each topic should independently measure the time passed.

[BUG] Messages are ignored with "Message not exist in record" exception

Describe the bug
I'm following the instruction to test the connector from README.md but all messages are being ignored with "Message not exist in record" exception and nothing is saved to S3.

To Reproduce
Steps to reproduce the behavior:
https://github.com/streamnative/pulsar-io-cloud-storage#usage
The only difference is own class instead of TestRecord.

public class Event {
    public String user_id;
    public String os;

    public Event(String user_id, String os) {
        this.user_id = user_id;
        this.os = os;
    }

    public Event() {
    }
}

Expected behavior
Messages are saved to S3

Additional context
Sink status:

{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReadFromPulsar" : 2,
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "numSinkExceptions" : 0,
      "latestSinkExceptions" : [ {
        "exceptionString" : "Message not exist in record",
        "timestampMs" : 1604410860088
      }, {
        "exceptionString" : "Message not exist in record",
        "timestampMs" : 1604410860095
      } ],
      "numWrittenToSink" : 0,
      "lastReceivedTime" : 1604410860093,
      "workerId" : "c-standalone-fw-localhost-8080"
    }
  } ]
}

Topic schema:

{
  "version": 2,
  "schemaInfo": {
    "name": "my-topic-2-avro",
    "schema": {
      "type": "record",
      "name": "Event",
      "fields": [
        {
          "name": "user_id",
          "type": [
            "null",
            "string"
          ]
        },
        {
          "name": "os",
          "type": [
            "null",
            "string"
          ]
        }
      ]
    },
    "type": "AVRO",
    "properties": {
      "__alwaysAllowNull": "true",
      "__jsr310ConversionEnabled": "false"
    }
  }
}

Same topics works with postgresql sing or cli consumer. Tried with JSON and AVRO schemas.

[BUG] formatType not working as expected

Describe the bug

We are using pulsar-io-cloud-storage connector for pushing pulsar topics' data into S3.

Then it gets an error stack as

2022-11-08T13:14:48,012+0000 [pulsar-io-cloud-storage-sink-flush-0] INFO  org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing 10 buffered records to blob store
2022-11-08T13:14:48,012+0000 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:
java.lang.NullPointerException: null
        at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.recoverGenericSchemaFromInternalSchema(AvroRecordUtil.java:98) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.getPulsarSchema(AvroRecordUtil.java:82) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:250) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:216) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

To Reproduce

Steps to reproduce the behavior:

  1. pulsar-io-s3.yaml
tenant: "default"
namespace: "sample"
name: "pulsar-io-s3-test"
inputs:
  - "persistent://default/sample/topic-sink"
archive: "connectors/pulsar-io-cloud-storage-2.9.3.15.nar"
parallelism: 1
configs:
  provider: "aws-s3"
  accessKeyId: ""
  secretAccessKey: ""
  roleSessionName: "test"
  formatType: "bytes"
  bucket: "pulsar-bucket"
  pathPrefix: "pulsar/test/"
  region: "us-east-1"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10
  batchTimeMs: 1000
  1. Local run is used for executing it on a pulsar broker
./bin/pulsar-admin sink localrun --sink-config-file conf/pulsar-io-s3.yaml

Expected behavior

pulsar-io-cloud-storage connector should be able to process the message and store it in the s3 bucket

Additional context

  1. Function context is used for producing events into persistent://default/sample/topic-sink topic.
context.publish(f"persistent://default/sample/topic-sink", item, compression_type=_pulsar.CompressionType.LZ4, message_conf={"partition_key": context.get_partition_key()})
  1. The messages in persistent://default/sample/topic-sink topic are of format b', here is a sample message -
b'{"op":"u","ts_ms":1667798064000,"source":{"version":"0.8.0-SNAPSHOT","name":"sample"},"after":null}'

[BUG] The connector covert array json bytes failed.

Describe the bug

When the source topic schema is bytes, and message data content is JSON array:

[{"map1-key1":"map1-value1","map1-key2":"map1-value2"},{"map2-key1":"map2-value1","map2-key2":"map2-value2"}]

If set connector config: formatType: "json", the convert will be failed.

com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.String,java.lang.Object>` from Array value (token `JsonToken.START_ARRAY`)
 at [Source: (byte[])"[{"map1-key1":"map1-value1","map1-key2":"map1-value2"},{"map2-key1":"map2-value1","map2-key2":"map2-value2"}]"; line: 1, column: 1]
	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromArray(StdDeserializer.java:222) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:447) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3707) ~[java-instance.jar:?]
	at org.apache.pulsar.io.jcloud.format.JsonFormat.convertRecordToObject(JsonFormat.java:148) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.format.JsonFormat.recordWriterBuf(JsonFormat.java:106) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:313) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:263) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:216) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]

To Reproduce
Steps to reproduce the behavior:

  1. Create a producer using bytes of schema and send JSON bytes content.
        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic("test-io-cloud-storage").create();

        Map<String, String> map1 = new HashMap<>();
        map1.put("map1-key1", "map1-value1");
        map1.put("map1-key2", "map1-value2");

        Map<String, String> map2 = new HashMap<>();
        map2.put("map2-key1", "map2-value1");
        map2.put("map2-key2", "map2-value2");

        List<Map<String, String>> recored = new ArrayList<>();
        recored.add(map1);
        recored.add(map2);

        ObjectMapper mapper = new ObjectMapper();
        byte[] jsonBytes = mapper.writeValueAsString(recored).getBytes();

        producer.send(jsonBytes);
  1. Create connector using formatType: "json" config.
admin sinks create --tenant public --namespace default --sink-config-file cloud-storage-sink.yaml -i "test-io-cloud-storage"

The cloud-storage-sink.yaml

formatType: "json"

configs:
  provider: "aws-s3"
  accessKeyId: "<access_key>"
  secretAccessKey: "<secret_key>"
  bucket: ""
  region: ""
  endpoint: ""
  formatType: "json"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10

Expected behavior
Can be converted normally and sent to S3

Screenshots

Additional context
In the documentation, this scenario is already supported

image

Release Cloud IO connector in streamnative-ci

According to https://github.com/streamnative/streamnative-ci/issues/374, Cloud IO connector will be included in streamnative-ci and appear in sn-pulsar and cloud pulsar images. The release process of Cloud IO connector can refer to the plugin part (e.g. Aop and Kop) in StreamNative Weekly Release Process. There are two things that need to be confirmed:

1. set-pulsar-version.sh and set-project-version.sh
Are there any other scripts (similar in function to set-pulsar-version.sh and set-project-version.sh) to adjust the pulsar version it depends on, and how to use them?

2. Create a project directory under the streamnative-ci/projects/
Please submit a project directory to streamnative-ci/projects. In general, under the submitted project, there should be three shell scripts, i.e. build.sh, get-version.sh and set-version.sh. To add this directory, You can refer to this document and the directories submitted by other projects in streamnative-ci/projects

Please contact me if you have any question :)

[FEATURE] Azure Blob Storage support

It would be great to have official support for Azure Blob Storage.

Ideally, this would be done using the native SDK as was done for AWS with the s3v2 provider.

[FEATURE] Handle raw data with the connector (topics without schema)

I would like to use the connector without schema in my topics.
Currently we have to provide a schema otherwise the connector fails to send the data from the topics to S3 buckets.

Like for the Kinesis Sink, it would be great to be able to handle raw data. Among the existing format types: json, avro and parquet we could have the raw type. The field to update is formatType.

I am currently using the connector to sink JSON data without schema, like I did with the Kinesis Sink. My current configuration is formatType: "json". For each message produced, the connector raises an exception:

16:05:43.437 [public/default/s3connector-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/s3connector:0] Uncaught exception in Java Instance                        │
│ java.lang.NullPointerException: null                                                                                                                                                                    │
│     at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.generateSchema(AutoConsumeSchema.java:154) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                │
│     at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:91) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                         │
│     at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:40) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                         │
│     at org.apache.pulsar.client.api.Schema.decode(Schema.java:103) ~[java-instance.jar:?]                                                                                                               │
│     at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:299) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                                         │
│     at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:76) ~[org.apache.pulsar-pulsar-functions-instance-2.7.0.3.jar:2.7.0.3]                                                │
│     at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:400) ~[org.apache.pulsar-pulsar-functions-instance-2.7.0.3.jar:?]                                  │
│     at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:237) [org.apache.pulsar-pulsar-functions-instance-2.7.0.3.jar:?]                                         │
│     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]                                                                                                                                              │
│ 16:05:43.443 [public/default/s3connector-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance                                                                          │
│ 16:05:43.447 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/s3connector] [auniquename] Closed consumer

[BUG] Pulsar cloud storage connector can't convert PROTO_NATIVE schema to Parquet schema

Describe the bug

The sink connector is not available to read topics with the PROTO_NATIVE schema.

The exception will be thrown:

19:04:35.029 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:    org.apache.avro.SchemaParseException: Cannot parse <null> schema
        at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?]
        at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?]
        at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?]
        at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

To Reproduce

Steps to reproduce the behavior:

  1. Create a topic with PROTO_NATIVE schema
  2. Create a sink connector to read from the topic
  3. The sink connector will fail

Expected behavior

The sink connector should read from topics with protobbuf or protobuf_native schema.

Screenshots
N/A

Additional context
N/A

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.