Git Product home page Git Product logo

pulsar-io-cloud-storage's Issues

[BUG] `subscriptionPosition` or `subsPosition` doesn't work when provided in yaml config file.

Describe the bug

When I create a sink using a yaml config file, if I specify subscriptionPosition or subsPosition it doesn't work - and just defaults to "Latest" even though I set it to "Earliest" in the YAML.

If I actually want to set to earliest I have to include the flag in the pulsar-admin command - not a huge deal, but probably a bug somewhere:

./pulsar-admin.sh stg sinks create --sink-config-file /scripts/pulsar/storage-sink/stg-billing-sink.yaml --subs-position Earliest

Here's the sink config yaml:

tenant: "public"
namespace: "default"
name: "cloud-storage-sinktest"
inputSpecs:
  persistent://public/sinktest/.*:
    regexPattern: true
  persistent://public/sinktest2/.*:
    regexPattern: true
archive: "builtin://cloud-storage-test"
parallelism: 1
autoAck: true
cleanupSubscription: true
subscriptionPosition: "Earliest"

configs:
  role: "arn:aws:iam::$$$$$$$$$$$$$:role/pulsar-storage-connector-role"
  roleSessionName: "name"
  provider: "s3v2"
  bucket: "$$$$$$$$$$$$$"
  region: "us-west-2"
  formatType: "json"
  pathPrefix: "sinktest/"
  partitionerType: "GLOBAL_TIME"
  batchSize: 15000
  batchTimeMs: 15000
  withMetadata: true
  useHumanReadableMessageId: true

and then also using subsPosition instead of subscriptionPosition.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[FEATURE] add configuration for including the original topic in the metadata when withMetadata is set to true

Is your feature request related to a problem? Please describe.

Would it be possible to add another configuration element that includes the original topic in the metadata when withMetadata is set to true? So still use GLOBAL_TIME so its single files but in certain cases they may still need to know which topic it came from (but we still want single files) so if it was in the metadata of the written out JSON that would be ideal and wouldn't require us to store it as a property from the beginning.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] batching does not batch

Describe the bug
While configuring s3 parquet sink with batching, I find out that the sinks creates a separate file for each record.

To Reproduce
Steps to reproduce the behavior:

  1. create a parquet/avro s3 sink with batching
  2. send 2 messages
  3. see that there are 2 parquet files on s3

Expected behavior
1 file parquet file containing 2 records

Proposed solution
from the code it seems that the batching is done wrong. we should batch 'plain' GenericRecords on the 'write' function, and later on the 'flush' function we should group records with by schema and partition path, change the 'Format.recordWriter' to take a list of records and only then send it to the blob store.

Side note:
In general a lot of sinks need some kind of windowing, is there any way to configure the window properties of pulsar function to work with pulsar sink? I think it could be useful to have another api for the sink that would get a collection of records rather than 1 record at a time. this way we do not need to reimplement batching for each sink

EDIT: is there a way to open PR with a fix?

@codelipenghui @jianyun8023

[BUG] no docs about `pathPrefix`

Describe the bug
The docs do not have pathPrefix description.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

GCS Sink Connector not able to write as parquet when published data using golang client for pulsar.. works fine with java client

@freeznet when I am writing data using pulsar golang client .. the below is the program I am using to write on pulsar topic and its able to publish on pulsar and I am also able to consume it .. but the stream native gcs sink connector is not able to write it as parquet on gcs and I am getting the below exception..

Slack Thread on Apache Pulsar Community : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1640579795494300

Exception Stack Trace : `16:28:54.381 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/sadim-test-json3][/pulsar-poc/cloud-storage-sink] Subscribed to topic on pulsar-poc-broker-1.pulsar-poc-broker.pulsar-poc.svc.cluster.local/10.248.1.83:6650 -- consumer: 43
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing [SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persistent://public/default/sadim-test-json3], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@23bbe054], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@2dd916ed,schemaType=JSON}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$322/0x0000000840688840@14f36ab9, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$321/0x0000000840688440@495fc424), value=[B@50d82707)] buffered records to blob store
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:
org.apache.avro.SchemaParseException: Cannot parse schema
at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?]
at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
16:28:55.788 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Skip flushing, because the pending flush queue is empty ...

accuknox-mysql-helm-temp/pulsar-test-dec22-test1/public/default/CiliumTelemetryDemo27Dec/2021-12-27`

Golang Client Code : `package main
import (
"context"
"time"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"encoding/json"
"strconv"
)

func main() {
fmt.Println("hello world")
for i := 0; i < 500; i++ {
fmt.Println("sadim"+ strconv.Itoa(i))
writePulsar("ciliumjson","sadim111")
fmt.Println("sadimend"+ strconv.Itoa(i))
}
fmt.Println("hello world 2")
}

type AccuknoxJsonWrapperObject struct {
msg AccuknoxFinalJsonWrapperObject msg:"json"
}

type AccuknoxFinalJsonWrapperObject struct {
inputjson string
}

var (
exampleSchemaDef = "{"type":"record","name":"Example2","namespace":"test2","fields":[{"name":"inputjson","type":"string"}]}"
)

func PublishToPulsarTopic(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
fmt.Println("Could not instantiate Pulsar client: %v")
}

defer client.Close()

	JsonWrapperObject2 := AccuknoxFinalJsonWrapperObject{inputjson}
	fmt.Println("sadim1")
	fmt.Println(JsonWrapperObject2)
	JsonWrapperObject := AccuknoxJsonWrapperObject{JsonWrapperObject2}
	fmt.Println("sadim2")
	fmt.Println(JsonWrapperObject)
	
	file, _ := json.Marshal(JsonWrapperObject2)
	fmt.Println("sadim3")
	fmt.Println(file)
	file2, _ := json.Marshal(JsonWrapperObject)
	fmt.Println("sadim4")
	fmt.Println(file2)

properties := make(map[string]string)
properties["pulsar"] = "hello"
fmt.Println("sadim5")
fmt.Println(properties)

jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})

if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: (JsonWrapperObject2),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

return ""

}
type testJSON struct {
InputJson string
}
var (
exampleSchemaDef5 = "{"type":"record","name":"testJSON","namespace":"test2"," +
""fields":[{"name":"InputJson","type":"testJSON"}]}"
)

var (
exSchema = "{"type":"record","schema":"","properties":{"InputJson":"testJSON"}}"
)
func writePulsar(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
fmt.Println(err)
}
defer client.Close()

//a2 := testJSON{InputJson: inputjson}
//properties := map[string]testJSON{"InputJson" : a2}
properties2 := make(map[string]string)
properties2["InputJson"] = "testJSON"
jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef5, properties2)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})
if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &testJSON{
InputJson: inputjson,
},
})
if err != nil {
fmt.Println(err)
}
producer.Close()
return ""
}`

Java Working Client which is able to write as parquet on pulsar code: `package accuknox.datalake;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.JSONSchema;

public class App3 {
public static void main(String[] args) throws InterruptedException, IOException {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
/*
* Producer producer = client.newProducer(JSONSchema.of(String.class))
* .topic("persistent://public/default/cilium-telemetry-4") .create();
*/
//Schema.JSON
Producer producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("CiliumTelemetryDemo27Dec").create();
//Producer<byte[]> producer = client.newProducer().topic("cilium-test1").create();
FileReader fr = null;
BufferedReader br = null;
AccuknoxJsonObject jsonObject = null;
for (int i=0;i<=90;i++) {
try
{
File file = new File("./knoxfeedermockdata/flow.txt");
//File file = new File("./knoxfeedermockdata/cilium_alerts_tableMockData.txt");
//File file = new File("./knoxfeedermockdata/cilium_v3_All.txt");
System.out.println(file.getAbsolutePath());
fr=new FileReader(file);
br=new BufferedReader(fr);
String line;
while((line=br.readLine())!=null)
{
System.out.println(line);
//for(int i = 0 ; i< 1000000; i++)
jsonObject = new AccuknoxJsonObject(line);
producer.send(jsonObject);
}
System.gc();
//this.getRuntime().gc()
fr.close(); //closes the stream and release the resources
}
catch(IOException e)
{
System.err.print(e);
} catch(NullPointerException e2) {
System.err.print(e2);
} finally {
try {
if(br != null)
br.close();
if(fr != null)
fr.close();
} catch(NullPointerException ex) {
System.err.print(ex);
}
}
//Thread.sleep(200);
}
producer.close();
client.close();
}
}
`

cc: @sijie

topic name as a group of topic partitions in s3 path

We need to have the topic name on the s3 path , and not only the partitions:

public/default/s3-test/s3-test-partition-0

Instead of

public/default/s3-test-partition-0

Not sure if this is something new or is already possible. Anyway should be an small change in the path.

Partition for cloud io connector

I am currently working with the Pulsar Cloud Store Connector (S3 connector)
I am using the latest version of Pulsar Cloud Storage Connector https://github.com/streamnative/pulsar-io-cloud-storage/releases/tag/v2.5.1.
Between the two partitionerType, partition (default) and time , it is not possible to use partition .
Because in the code, a test is done on the parameter value: https://github.com/streamnative/pulsar-io-cloud-storage/blob/3c734a592598f04a012cf[…]n/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java
And the value for the test is default instead of partition : https://github.com/streamnative/pulsar-io-cloud-storage/blob/3c734a592598f04a012cf[…]n/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java

[FEATURE] Add datetime string substitutions to prefix field

Is your feature request related to a problem? Please describe.
Now that the new partitioner doesn't add a datetime based prefix to the file it would be useful if we could provide the date/time as part of the prefix while using partitioner: topic through placeholders.

Describe the solution you'd like
Something like being able to specify %Y-%M-%D in the prefix field of the config and have the writer substitute those placeholders for the actual date values (doesn't have to be those exact placeholders but thats the idea)

Describe alternatives you've considered
The alternative is to continue using the legacy partitioner along with the time settings but it seems like the legacy partitioner is eventually going to be deprecated so we figured this was more future-proof.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] failed to extract schema version for AVRO format and causing cast error

Describe the bug

When useHumanReadableSchemaVersion=false

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: null in field schemaVersion in field __message_metadata__
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:317) ~[java-instance.jar:?]
        at org.apache.pulsar.io.jcloud.format.AvroFormat.recordWriterBuf(AvroFormat.java:105) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:264) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:234) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[q6MAPb11NcKFgJvec2mFsA/:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassCastException: null in field schemaVersion in field __message_metadata__
        at org.apache.avro.generic.GenericDatumWriter.addClassCastMsg(GenericDatumWriter.java:191) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:229) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) ~[java-instance.jar:?]
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:314) ~[java-instance.jar:?]
        ... 10 more
Caused by: java.lang.ClassCastException

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] extract schema info from messages without schema version

Describe the bug
If messages without schema version, it will treat as bytes schema, but it can be recovered from the schema info.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[FEATURE] Azure Blob Storage support

It would be great to have official support for Azure Blob Storage.

Ideally, this would be done using the native SDK as was done for AWS with the s3v2 provider.

Release Cloud IO connector in streamnative-ci

According to https://github.com/streamnative/streamnative-ci/issues/374, Cloud IO connector will be included in streamnative-ci and appear in sn-pulsar and cloud pulsar images. The release process of Cloud IO connector can refer to the plugin part (e.g. Aop and Kop) in StreamNative Weekly Release Process. There are two things that need to be confirmed:

1. set-pulsar-version.sh and set-project-version.sh
Are there any other scripts (similar in function to set-pulsar-version.sh and set-project-version.sh) to adjust the pulsar version it depends on, and how to use them?

2. Create a project directory under the streamnative-ci/projects/
Please submit a project directory to streamnative-ci/projects. In general, under the submitted project, there should be three shell scripts, i.e. build.sh, get-version.sh and set-version.sh. To add this directory, You can refer to this document and the directories submitted by other projects in streamnative-ci/projects

Please contact me if you have any question :)

[BUG] Pulsar cloud storage connector can't convert PROTO_NATIVE schema to Parquet schema

Describe the bug

The sink connector is not available to read topics with the PROTO_NATIVE schema.

The exception will be thrown:

19:04:35.029 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:    org.apache.avro.SchemaParseException: Cannot parse <null> schema
        at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?]
        at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?]
        at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?]
        at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[qg2KQyXZ9-7y9GGaCCq0Hg/:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

To Reproduce

Steps to reproduce the behavior:

  1. Create a topic with PROTO_NATIVE schema
  2. Create a sink connector to read from the topic
  3. The sink connector will fail

Expected behavior

The sink connector should read from topics with protobbuf or protobuf_native schema.

Screenshots
N/A

Additional context
N/A

[FEATURE] Pending Queuesize default to the maxbatch size

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
Currently the pendingQueuesize is default to 10X of maxbatch size, we should reduce the ration to 1.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Update master to JDK17

To keep in sync with Pulsar master, we should upgrade the connector to use JDK 17 too.

[BUG] error while writing data in Parquet format using JsonSchema as schema format

Describe the bug
I was using the Cloud Storage Sink to collect data from Pulsar and write it to AWS S3 in Parquet. Messages were produced using a JsonSchema format. The Sink fails as soon as it tries to convert the collected data into org.apache.avro.generic.GenericRecord (within the convertGenericRecord function).

It tried to produce messages both from Python and from Java and both fail but with different stack traces.

Note: if the formatType specified in the configuration is json everything works fine.

To Reproduce
Use this template configuration for the pulsar-io-cloud-storage v2.9.3.6:

tenant: "<theTenant>"
namespace: "schema-registry"
name: "cloud-storage-sink"
inputs: 
  - "persistent://<theTenant>/<theNamespace>/<theTopic>"
  - <otherTopicUrl>
archive: "connectors/pulsar-io-cloud-storage-2.9.3.6.nar"
parallelism: 1

configs:
  provider: "aws-s3"
  accessKeyId: "<yourAccessKeyId>"
  secretAccessKey: "<yourSecretAccessKey>"
  bucket: "<yourS3Bucket>"
  region: "<yourRegion>"
  pathPrefix: "cloud_storage_sink_parquet/"
  formatType: "parquet"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 100
  batchTimeMs: 600000
  withMetadata: false
  withTopicPartitionNumber: false

And produce messages in JsonSchema format. Here the code for a minimal Python producer:

import pulsar
from pulsar.schema import *

class YourMessageClass(Record):
    ...

def generate_message() -> YourMessageClass:
    ...

if __name__ == '__main__':
    client = pulsar.Client('pulsar://host.docker.internal:6650')

    producer = pulsar.Client.create_producer(
        topic='persistent://<theTenant>/<theNamespace>/<theTopic>',
        producer_name='python_producer',
        schema=pulsar.schema.JsonSchema(YourMessageClass)
    )

    for i in range(100):
        msg = generate_message()
        producer.send(msg)

    client.close()

Expected behavior
A chunk of data containing a list of collected messages, written to the specified AWS S3 prefix in Parquet format.

Screenshots
None

Additional context
The tests were done on my laptop, using an Apache Pulsar Docker container where the schema-registry was properly configured (the schema definition of the messages have been uploaded) and the version pulsar-io-cloud-storage-2.9.3.6.nar was loaded.

This is the error occurred while writing data produced with the Python producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-02/180925500946.parquet
java.lang.NullPointerException: null
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:220) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

This is the error occurred while writing data produced with the Java producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-08/64156074046.parquet
java.util.NoSuchElementException: No value present
	at java.util.Optional.get(Optional.java:148) ~[?:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:207) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

[BUG] NPE if partitionerType is not set

Describe the bug
If you don't set partitionerType in the config, a NPE is thrown.

java.lang.NullPointerException: null
    at org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig.validate(BlobStoreAbstractConfig.java:136) 
    at org.apache.pulsar.io.jcloud.sink.CloudStorageSinkConfig.validate(CloudStorageSinkConfig.java:141) 
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.open(BlobStoreAbstractSink.java:99) 
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:925) ~[?:?]

To Reproduce
See above

Expected behavior
Clear error message to let the user correct the configuration

[BUG] Messages are ignored with "Message not exist in record" exception

Describe the bug
I'm following the instruction to test the connector from README.md but all messages are being ignored with "Message not exist in record" exception and nothing is saved to S3.

To Reproduce
Steps to reproduce the behavior:
https://github.com/streamnative/pulsar-io-cloud-storage#usage
The only difference is own class instead of TestRecord.

public class Event {
    public String user_id;
    public String os;

    public Event(String user_id, String os) {
        this.user_id = user_id;
        this.os = os;
    }

    public Event() {
    }
}

Expected behavior
Messages are saved to S3

Additional context
Sink status:

{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReadFromPulsar" : 2,
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "numSinkExceptions" : 0,
      "latestSinkExceptions" : [ {
        "exceptionString" : "Message not exist in record",
        "timestampMs" : 1604410860088
      }, {
        "exceptionString" : "Message not exist in record",
        "timestampMs" : 1604410860095
      } ],
      "numWrittenToSink" : 0,
      "lastReceivedTime" : 1604410860093,
      "workerId" : "c-standalone-fw-localhost-8080"
    }
  } ]
}

Topic schema:

{
  "version": 2,
  "schemaInfo": {
    "name": "my-topic-2-avro",
    "schema": {
      "type": "record",
      "name": "Event",
      "fields": [
        {
          "name": "user_id",
          "type": [
            "null",
            "string"
          ]
        },
        {
          "name": "os",
          "type": [
            "null",
            "string"
          ]
        }
      ]
    },
    "type": "AVRO",
    "properties": {
      "__alwaysAllowNull": "true",
      "__jsr310ConversionEnabled": "false"
    }
  }
}

Same topics works with postgresql sing or cli consumer. Tried with JSON and AVRO schemas.

[BUG] Time Partitioner not working as expected for GCS

Data Sent to GCS Bucket is not in the correct date folder.
I have configured timePartitionDuration: 1d It was working fine but after a few days dates folder was not being created (say after 2022-06-23 folders had not been created) and the data was being sent to the 2022-06-23 folder.
Data being sent to GCS.

To Reproduce
I am using
Function Mesh version: 0.1.11-rc3
Pulsar IO Cloud storage sink connector Version: 2.9.2.21

Expected behavior
Data Sent to GCS Bucket should be in the respective data folder.

Screenshots
Screenshot 2022-06-28 204511

Additional context
Add any other context about the problem here.

[BUG] Connector does not create output files for each topic when reading multiple topics

The connector creates only one output file shared by the records of all topics, instead of creating independent output files for each topic.

To Reproduce
Steps to reproduce the behavior:

  1. Consume from 3 topics
  2. Check output file(s)
  3. Single output file was created, containing records of all 3 topics

Expected behavior
Should create 3 output files, each containing records of only one topic

Additional context
The problem is that the sink only looks at the first record of each batch to calculate the file path. This is fine for single-topic consumption but not in case it reads from multiple topics.

https://github.com/streamnative/pulsar-io-cloud-storage/blob/master/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java#L242

[BUG] `maxTimeoutMs` should apply at the topic level

Describe the bug
Similar to #493, currently the maxTimeoutMs is applied for all topics at once, instead of having an independent timer for each topic.

Expected behavior
Each topic should independently measure the time passed.

[BUG] NPE when schema not exist with useHumanReadableSchemaVersion enabled

Describe the bug

│ pulsar-sink java.lang.NullPointerException: Cannot read the array length because "array" is null                                                                                                                                                  │
│ pulsar-sink     at java.nio.ByteBuffer.wrap(ByteBuffer.java:437) ~[?:?]                                                                                                                                                                           │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.util.MetadataUtil.parseSchemaVersionFromBytes(MetadataUtil.java:169) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                  │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.util.MetadataUtil.extractedMetadata(MetadataUtil.java:88) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                             │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.format.JsonFormat.recordWriterBuf(JsonFormat.java:120) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                                │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:324) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                  │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:272) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                │
│ pulsar-sink     at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:216) ~[vFdZUZqkBvBdIxBFOEzjGw/:?]                                                                                                      │
│ pulsar-sink     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]                                                                                                                                                 │
│ pulsar-sink     at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]                                                                                                                                                                │
│ pulsar-sink     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]                                                                                                          │
│ pulsar-sink     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]                                                                                                                                         │
│ pulsar-sink     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]                                                                                                                                         │
│ pulsar-sink     at java.lang.Thread.run(Thread.java:840) ~[?:?]

To Reproduce
Steps to reproduce the behavior:

  1. Start a cloud storage sink connector with useHumanReadableSchemaVersion and withMetadata enabled
  2. Produce some none schema messages
        Producer<byte[]> producer = client.newProducer()
                .topic("persistent://public/default/test-s3-2")
                .create();

        for (int i = 0; i < 10; i++) {
            String message = "{\"test-message\": \"test-value\"}";
            MessageId msgID = producer.send(message.getBytes());
            System.out.println("Publish " + "my-message-" + i
                    + " and message ID " + msgID);
        }
  1. NPE will be thrown in the connector. The connector won't failed and hang forever.

Expected behavior
Should ignore parsing the no-exist schema.

[FEATURE] Handle raw data with the connector (topics without schema)

I would like to use the connector without schema in my topics.
Currently we have to provide a schema otherwise the connector fails to send the data from the topics to S3 buckets.

Like for the Kinesis Sink, it would be great to be able to handle raw data. Among the existing format types: json, avro and parquet we could have the raw type. The field to update is formatType.

I am currently using the connector to sink JSON data without schema, like I did with the Kinesis Sink. My current configuration is formatType: "json". For each message produced, the connector raises an exception:

16:05:43.437 [public/default/s3connector-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/s3connector:0] Uncaught exception in Java Instance                        │
│ java.lang.NullPointerException: null                                                                                                                                                                    │
│     at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.generateSchema(AutoConsumeSchema.java:154) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                │
│     at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:91) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                         │
│     at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:40) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                         │
│     at org.apache.pulsar.client.api.Schema.decode(Schema.java:103) ~[java-instance.jar:?]                                                                                                               │
│     at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:299) ~[org.apache.pulsar-pulsar-client-original-2.7.0.3.jar:2.7.0.3]                                                         │
│     at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:76) ~[org.apache.pulsar-pulsar-functions-instance-2.7.0.3.jar:2.7.0.3]                                                │
│     at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:400) ~[org.apache.pulsar-pulsar-functions-instance-2.7.0.3.jar:?]                                  │
│     at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:237) [org.apache.pulsar-pulsar-functions-instance-2.7.0.3.jar:?]                                         │
│     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]                                                                                                                                              │
│ 16:05:43.443 [public/default/s3connector-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance                                                                          │
│ 16:05:43.447 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/s3connector] [auniquename] Closed consumer

[BUG] The connector covert array json bytes failed.

Describe the bug

When the source topic schema is bytes, and message data content is JSON array:

[{"map1-key1":"map1-value1","map1-key2":"map1-value2"},{"map2-key1":"map2-value1","map2-key2":"map2-value2"}]

If set connector config: formatType: "json", the convert will be failed.

com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.LinkedHashMap<java.lang.String,java.lang.Object>` from Array value (token `JsonToken.START_ARRAY`)
 at [Source: (byte[])"[{"map1-key1":"map1-value1","map1-key2":"map1-value2"},{"map2-key1":"map2-value1","map2-key2":"map2-value2"}]"; line: 1, column: 1]
	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromArray(StdDeserializer.java:222) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:447) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:32) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[java-instance.jar:?]
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3707) ~[java-instance.jar:?]
	at org.apache.pulsar.io.jcloud.format.JsonFormat.convertRecordToObject(JsonFormat.java:148) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.format.JsonFormat.recordWriterBuf(JsonFormat.java:106) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:313) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:263) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:216) ~[5hMO4nytTcSR3B1qE35-Aw/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]

To Reproduce
Steps to reproduce the behavior:

  1. Create a producer using bytes of schema and send JSON bytes content.
        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic("test-io-cloud-storage").create();

        Map<String, String> map1 = new HashMap<>();
        map1.put("map1-key1", "map1-value1");
        map1.put("map1-key2", "map1-value2");

        Map<String, String> map2 = new HashMap<>();
        map2.put("map2-key1", "map2-value1");
        map2.put("map2-key2", "map2-value2");

        List<Map<String, String>> recored = new ArrayList<>();
        recored.add(map1);
        recored.add(map2);

        ObjectMapper mapper = new ObjectMapper();
        byte[] jsonBytes = mapper.writeValueAsString(recored).getBytes();

        producer.send(jsonBytes);
  1. Create connector using formatType: "json" config.
admin sinks create --tenant public --namespace default --sink-config-file cloud-storage-sink.yaml -i "test-io-cloud-storage"

The cloud-storage-sink.yaml

formatType: "json"

configs:
  provider: "aws-s3"
  accessKeyId: "<access_key>"
  secretAccessKey: "<secret_key>"
  bucket: ""
  region: ""
  endpoint: ""
  formatType: "json"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10

Expected behavior
Can be converted normally and sent to S3

Screenshots

Additional context
In the documentation, this scenario is already supported

image

[FEATURE] Add a new batch triggering mechanism that is based on the batch size

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
Whenever the records inside a batch has a total size of a certain value (e.g., 64MB), trigger a new batch to send out data

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] Verify the connector can interact with gcs and azure blob store correctly

Describe the bug
Currently the connector lack verification of running with

  • gcs
  • azure blob store

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] Impossible to select the partition type

Between the two partitionerType: partition (default) and time, it is not possible to use partition.

This is due to the fact that is a mismatch in the code. The partitionerType is defined with the types: default and time. With the condition below, the property has to contain one of the two defined values.

Then the types partition is used instead of default, therefore we can't use it.

Two possibilities to revolve the mismatch:

  • Rename default by partition in the Map, to be aligned with the documentation.
  • Rename partition by default in the switch and update the documentation.

[BUG] formatType not working as expected

Describe the bug

We are using pulsar-io-cloud-storage connector for pushing pulsar topics' data into S3.

Then it gets an error stack as

2022-11-08T13:14:48,012+0000 [pulsar-io-cloud-storage-sink-flush-0] INFO  org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing 10 buffered records to blob store
2022-11-08T13:14:48,012+0000 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:
java.lang.NullPointerException: null
        at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.recoverGenericSchemaFromInternalSchema(AvroRecordUtil.java:98) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.getPulsarSchema(AvroRecordUtil.java:82) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:250) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:216) ~[2Wt2FL2iQu7JDGE6nzDX_g/:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

To Reproduce

Steps to reproduce the behavior:

  1. pulsar-io-s3.yaml
tenant: "default"
namespace: "sample"
name: "pulsar-io-s3-test"
inputs:
  - "persistent://default/sample/topic-sink"
archive: "connectors/pulsar-io-cloud-storage-2.9.3.15.nar"
parallelism: 1
configs:
  provider: "aws-s3"
  accessKeyId: ""
  secretAccessKey: ""
  roleSessionName: "test"
  formatType: "bytes"
  bucket: "pulsar-bucket"
  pathPrefix: "pulsar/test/"
  region: "us-east-1"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10
  batchTimeMs: 1000
  1. Local run is used for executing it on a pulsar broker
./bin/pulsar-admin sink localrun --sink-config-file conf/pulsar-io-s3.yaml

Expected behavior

pulsar-io-cloud-storage connector should be able to process the message and store it in the s3 bucket

Additional context

  1. Function context is used for producing events into persistent://default/sample/topic-sink topic.
context.publish(f"persistent://default/sample/topic-sink", item, compression_type=_pulsar.CompressionType.LZ4, message_conf={"partition_key": context.get_partition_key()})
  1. The messages in persistent://default/sample/topic-sink topic are of format b', here is a sample message -
b'{"op":"u","ts_ms":1667798064000,"source":{"version":"0.8.0-SNAPSHOT","name":"sample"},"after":null}'

[BUG] Sink flushes only when batchTimeMs is exceeded for large batchSize

Describe the bug
BlobStoreAbstractSink is written incorrectly and contains a bug.
Every time the framework calls write() (records are from the consumer), it does a blocking add to a queue called pendingFlushQueue which has max capacity of 100,000 as the users configured (the sink - pendingQueueSize).
After the write was successful, it runs flushIfNeeded checking if we exceeded max bytes (configured to default of 10kb) needed to flush or max records number (batchSize - configured to 10,000 records).
So after pod start, queue is 0, and quickly fills up. Once it hits 10kb, it async launches a flush and continues to write to the queue until the queue is full (100,000 records).
The flush takes up-to 10kb or 10,000 records, so 10kb and writes it to S3.
Then finishes.
Since the write() is blocked since queue is full, nothing triggers another flush.
So only after maxBatchTime currently configured to 5min, it runs a flush, takes 10kb, flush, and repeat - 5min.

Expected behavior
The queue should flush the pending records repeatedly, independent of the insertion by the write method

[BUG] Java heap space error when trying to run pulsar sink

Describe the bug
https://streamnative.io/en/blog/tech/2020-10-20-cloud-storage-sink-connector-251 When running part 2 in step 2, we encountered java heap space error

To Reproduce
Steps to reproduce the behavior:

  1. copy pulsar-io-cloud-storage-2.5.1.nar to apache-pulsar-2.7.0/connectors
  2. restart broker
  3. upload cloud-storage-sink-config.yaml to apache-pulsar-2.7.0 folder
  4. bin/pulsar-admin sink localrun --sink-config-file cloud-storage-sink-config.yaml
  5. see java heap space error: [function-timer-thread-6-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - demotenant/demonamespace/Cloud-Storage-sink-java.lang.OutOfMemoryError: Java heap space Function Container is dead with exception.. restarting

Expected behavior
Should not see error

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Our brokers are deployed in docker containers, so step 4 was run within the docker container.

Can anyone help with this? Thanks

Pulsar GCS Sink Connector not able to write as parquet when pulsar function sends data to output topic in some user defined pojo format

for more details please refer the slack thread : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1643259234073739

@freeznet @sijie

Pulsar Function code:=
`package accuknox.pulsar.functions;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

public class PulsarFunctionTest3 implements Function<String, String> {
// This function is invoked every time a message is published to the input topic
@OverRide
public String process(String input, Context context) throws Exception {
final String a;
Arrays.asList(input.split(",")).forEach(word -> {
String counterKey = word.toLowerCase();
context.incrCounter(counterKey, 1);
});
return String.format("%s!", input).concat("tomwoo");
}
}`

Above pulsar function when loading data on output topic .. gcs sink connector is unable to load data as parquet ..

The error stack trace is :=

-dec22-test1/public/default/sadimtestoutput999/2022-01-27/04/51002737768.parquet java.lang.RuntimeException: Null-value for required field: HostPID at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:200) ~[parquet-avro-1.12.0.jar:1.12.0] at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:171) ~[parquet-avro-1.12.0.jar:1.12.0] at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:138) ~[parquet-hadoop-1.12.0.jar:1.12.0] at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:310) ~[parquet-hadoop-1.12.0.jar:1.12.0] at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriter(ParquetFormat.java:92) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:279) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:244) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?]

Also .. the sink connector version is :=
configMapRef:
name: cluster-function-mesh-config
image: docker.io/streamnative/pulsar-io-cloud-storage:2.8.1.30
imagePullPolicy: IfNotPresent
name: pulsar-sink

FYR :
even when producer uses byte array then also throws the below error:-
Producer<byte[]> producer = client.newProducer().topic("sadiuyuiu").create();

0c4)] buffered records to blob store 03:50:20.142 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception: org.apache.avro.SchemaParseException: Cannot parse <null> schema at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?] at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?] at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?] at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?]

also ..
if the producer object is create as below.. then the gcs sink connector is not able to write as parquet on gcs and throws error that schema is null:-
producer = client.newProducer(Schema.STRING).topic(topicName).create();

the issue is gcs sink connector is able to write as parquet when the publisher/producer class object on input topic has beein intialized as below:-
Producer producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("sadim009").create();

link shared by @freeznet for other reference issue : apache/pulsar#12886

Note: please check slack thread https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1643259234073739 and https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1643176282015089 for complete java classes

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.