Git Product home page Git Product logo

sharppulsar's Introduction

Build Tests Nuget

SharpPulsar

SharpPulsar is an Apache Pulsar Client built on top Akka.net, which can handle millions of Apache Pulsar Producers/Consumers/Reader/Transaction/Table (in theory).

What Is Akka.NET?

Akka.NET is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on .NET & Mono that is able to support up to 50 million msg/sec on a single machine, with small memory footprint and ~2.5 million actors(or Apache Pulsar Producers/Consumers) per GB of heap.

What Is Apache Pulsar?

Apache Pulsar is a cloud-native, distributed messaging and streaming platform that is able to support millions of topics while delivering high-throughput and low-latency performance.

Supported features

Client

  • TLS
  • Authentication (token, tls, OAuth2)
  • Multi-Hosts Service URL
  • Proxy
  • SNI Routing
  • Transactions
  • Subscription(Durable, Non-durable)
  • Cluster-level Auto Failover

Producer

  • Exclusive Producer
  • Partitioned Topics
  • Batching
  • Compression (LZ4, ZLIB, ZSTD, SNAPPY)
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • User-defined properties
  • Key-based batcher
  • Delayed/Scheduled messages
  • Interceptors
  • Message Router (RoundRobin, ConsistentHashing, Broadcast, Random)
  • End-to-end Encryption
  • Chunking
  • Transactions

Consumer

  • User-defined properties
  • HasMessageAvailable
  • Subscription Type (Exclusive, Failover, Shared, Key_Shared)
  • Subscription Mode (Durable, Non-durable)
  • Interceptors
  • Ack (Ack Individual, Ack Commulative, Batch-Index Ack)
  • Ack Timeout
  • Negative Ack
  • Dead Letter Policy
  • End-to-end Encryption
  • SubscriptionInitialPosition
  • Partitioned Topics
  • Batching
  • Compression (LZ4, ZLIB, ZSTD, SNAPPY)
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • Compacted Topics
  • Multiple Topics
  • Regex Consumer
  • Broker Entry Metadata

Reader

  • User-defined properties
  • HasMessageAvailable
  • Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
  • Seek (MessageID, Timestamp)
  • Multiple Topics
  • End-to-end Encryption
  • Interceptors

TableView

  • Compacted Topics
  • Schema (All supported schema types)
  • Register Listener

Extras

  • Pulsar SQL
  • Pulsar Admin REST API
  • Function REST API
  • EventSource(Reader/SQL)
  • OpenTelemetry (ProducerOTelInterceptor, ConsumerOTelInterceptor)

Getting Started

Install the NuGet package SharpPulsar and follow the Tutorials.

//pulsar client settings builder
            var clientConfig = new PulsarClientConfigBuilder()
                .ServiceUrl("pulsar://localhost:6650");

            //pulsar actor system
            var pulsarSystem = PulsarSystem.GetInstance(clientConfig);

            var pulsarClient = pulsarSystem.NewClient();

            var consumer = pulsarClient.NewConsumer(new ConsumerConfigBuilder<sbyte[]>()
                .Topic(myTopic)
                .ForceTopicCreation(true)
                .SubscriptionName("myTopic-sub"));

            var producer = pulsarClient.NewProducer(new ProducerConfigBuilder<sbyte[]>()
                .Topic(myTopic));

            for (var i = 0; i < 10; i++)
            {
                var data = Encoding.UTF8.GetBytes($"tuts-{i}").ToSBytes();
                producer.NewMessage().Value(data).Send();
            }
			Thread.Sleep(TimeSpan.FromSeconds(5));
            for (var i = 0; i < 10; i++)
            {
                var message = (Message<sbyte[]>)consumer.Receive();
                consumer.Acknowledge(message);
                var res = Encoding.UTF8.GetString(message.Data.ToBytes());
                Console.WriteLine($"message '{res}' from topic: {message.TopicName}");
            }

Logical Types

Avro Logical Types are supported. Message object MUST implement ISpecificRecord

    AvroSchema<LogicalMessage> avroSchema = AvroSchema<LogicalMessage>.Of(ISchemaDefinition<LogicalMessage>.Builder().WithPojo(typeof(LogicalMessage)).WithJSR310ConversionEnabled(true).Build());

    public class LogicalMessage : ISpecificRecord
    {
        [LogicalType(LogicalTypeKind.Date)]
        public DateTime CreatedTime { get; set; }
		
        [LogicalType(LogicalTypeKind.TimestampMicrosecond)]
        public DateTime StampMicros { get; set; }

        [LogicalType(LogicalTypeKind.TimestampMillisecond)]
        public DateTime StampMillis { get; set; }
		
	[LogicalType(LogicalTypeKind.TimeMicrosecond)]
        public TimeSpan TimeMicros { get; set; }

        [LogicalType(LogicalTypeKind.TimeMillisecond)]
        public TimeSpan TimeMillis { get; set; }
        
        public AvroDecimal Size { get; set; }
		
        public string DayOfWeek { get; set; }

        [Ignore]
        public Avro.Schema Schema { get; set; }

        public object Get(int fieldPos)
        {
            switch (fieldPos)
            {
                case 0: return CreatedTime; 
	        case 1: return StampMicros;
                case 2: return StampMillis;
	        case 3: return TimeMicros;
                case 4: return TimeMillis;
                case 5: return Size;
                case 6: return DayOfWeek;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
            };
        }

        public void Put(int fieldPos, object fieldValue)
        {
            switch (fieldPos)
            {
                case 0: CreatedTime = (DateTime)fieldValue; break;
		case 1: StampMicros = (DateTime)fieldValue; break;
                case 2: StampMillis = (DateTime)fieldValue; break;
	        case 3: TimeMicros = (TimeSpan)fieldValue; break;
                case 4: TimeMillis = (TimeSpan)fieldValue; break;
                case 5: Size = (AvroDecimal)fieldValue; break;
                case 6: DayOfWeek = (String)fieldValue; break;
                default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
            };
        }
    }

KeyValue Schema ALERT!!!!

Because I have become lazy and a lover of "peace of mind":

  • For schema type of KEYVALUESCHEMA:
    producer.NewMessage().Value<TK, TV>(data).Send();  
    OR
    producer.Send<TK, TV>(data);

TK, TV represents the key and value types of the KEYVALUESCHEMA respectively.

TableView

var topic = $"persistent://public/default/tableview-{DateTime.Now.Ticks}";
var count = 20;
var keys = await PublishMessages(topic, count, false);

var tv = await _client.NewTableViewBuilder(ISchema<string>.Bytes)
.Topic(topic)
.AutoUpdatePartitionsInterval(TimeSpan.FromSeconds(60))
.CreateAsync();
 
 Console.WriteLine($"start tv size: {tv.Size()}");
 tv.ForEachAndListen((k, v) => Console.WriteLine($"{k} -> {Encoding.UTF8.GetString(v)}"));
 await Task.Delay(5000);
 Console.WriteLine($"Current tv size: {tv.Size()}");

 tv.ForEachAndListen((k, v) => Console.WriteLine($"checkpoint {k} -> {Encoding.UTF8.GetString(v)}"));

OpenTelemetry

var exportedItems = new List<Activity>();
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("producer", "consumer")
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("inmemory-test"))
.AddInMemoryExporter(exportedItems)
.Build();

 var producerBuilder = new ProducerConfigBuilder<byte[]>()
 .Intercept(new ProducerOTelInterceptor<byte[]>("producer", _client.Log))
 .Topic(topic);

 var consumerBuilder = new ConsumerConfigBuilder<byte[]>()
 .Intercept(new ConsumerOTelInterceptor<byte[]>("consumer", _client.Log))
 .Topic(topic);

Cluster-level Auto Failover

var config = new PulsarClientConfigBuilder();
var builder = AutoClusterFailover.Builder().Primary(serviceUrl)
.Secondary(new List<string> { secondary })
.FailoverDelay(TimeSpan.FromSeconds(failoverDelay))
.SwitchBackDelay(TimeSpan.FromSeconds(switchBackDelay))
.CheckInterval(TimeSpan.FromSeconds(checkInterval));
config.ServiceUrlProvider(new AutoClusterFailover((AutoClusterFailoverBuilder)builder));

[Experimental]Running SharpPulsar Tests in docker container (the issue I have faced is how to create container from within a container)

You can run SharpPulsar tests in docker container. A Dockerfile and docker-compose file is provided at the root folder to help you run these tests in a docker container. docker-compose.yml:

version: "2.4"

services:
  akka-test:
    image: sharp-pulsar-test
    build: 
      context: .
    cpu_count: 1
    mem_limit: 1g
    environment:
      run_count: 2
      # to filter tests, uncomment
      # test_filter: "--filter FullyQualifiedName=SharpPulsar.Test.MessageChunkingTest"
      test_file: Tests/SharpPulsar.Test/SharpPulsar.Test.csproj

Dockerfile:

FROM mcr.microsoft.com/dotnet/sdk:6.0 
ENV test_file="Tests/SharpPulsar.Test/SharpPulsar.Test.csproj"
ENV test_filter=""
ENV run_count=2
RUN mkdir sharppulsar
COPY . ./sharppulsar
RUN ls
WORKDIR /sharppulsar
CMD ["/bin/bash", "-c", "x=1; c=0; while [ $x -le 1 ] && [ $c -le ${run_count} ]; do dotnet test ${test_file} ${test_filter} --framework net6.0 --logger trx; c=$(( $c + 1 )); if [ $? -eq 0 ]; then x=1; else x=0; fi;  done"]

How to:

cd into the root directory and execute docker-compose up run-count is the number of times you want the test repeated. test_filter is used when you need to test a specific test instead of running all the tests in the test suite.

License

This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.

sharppulsar's People

Contributors

eaba avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

sharppulsar's Issues

TypeInitializationException

Result StackTrace:	
at SharpPulsar.Protocol.Commands.get_CurrentProtocolVersion() in C:\Users\Technical\source\repos\SharpPulsar\SharpPulsar\Protocol\Commands.cs:line 1621
   at SharpPulsar.Impl.ClientCnx..ctor(ClientConfigurationData conf, MultithreadEventLoopGroup eventLoopGroup) in C:\Users\Technical\source\repos\SharpPulsar\SharpPulsar\Impl\ClientCnx.cs:line 112
   at SharpPulsar.Test.Impl.ClientCnxTest.TestClientCnxTimeout() in C:\Users\Technical\source\repos\SharpPulsar\SharpPulsar.Test\Impl\ClientCnxTest.cs:line 44
----- Inner Stack Trace -----
   at SharpPulsar.Protocol.Proto.BaseCommand.Builder.Build() in C:\Users\Technical\source\repos\SharpPulsar\SharpPulsar\Protocol\Proto\Partial\BaseCommand.cs:line 831
   at SharpPulsar.Protocol.Commands.SerializeWithSize(Builder cmdBuilder) in C:\Users\Technical\source\repos\SharpPulsar\SharpPulsar\Protocol\Commands.cs:line 1285
   at SharpPulsar.Protocol.Commands..cctor() in C:\Users\Technical\source\repos\SharpPulsar\SharpPulsar\Protocol\Commands.cs:line 974
Result Message:	
System.TypeInitializationException : The type initializer for 'SharpPulsar.Protocol.Commands' threw an exception.
---- System.NullReferenceException : BaseCommand not initialized

Re-implement with FSM

The past few days, I have had the privilege of working on some of the @akkadotnet issues, and before then I always shied away from FSM(Finite State Machine) but after working on one specific issue on FSM, I have come to see how it can benefit SharpPulsar.

I want to humbly ask @Aaronontheweb for his view on this!

And @Aaronontheweb, I remember now, one of the challenges I experienced was that IActor is not typed yet :)

how to use as console application

hi,
I want to use sharpPulsar client in c# console application ,any public nuget dll available or doc so that i will follow and test besic consumer and producer for pulsar .i see intersting here support schema which i am quite intresting in tests

How to use admin client to get stats (interface / docs unclear)

I'd like to use the admin client to query topic stats and grab the value subscriptions[$subscriptionName].consumers[$consumerName].keyHashRanges. However, based on the interface IPulsarAdminRESTAPIClient, it appears that the GetStatsAsync call merely returns a Task - it is not clear how to consume the JSON results of this API call and extract the value I want. I don't see anything about it in the tutorials or README files either. Could you provide a very quick usage sample, please?

netcore compatibility

Hi, we discussed this briefly on twitter several weeks ago, but having just tried again, SharpPulsar doesn't seem to be compatible with dotnetcore:P

error NU1202: Package SharpPulsar 0.8.4 is not compatible with netcoreapp2.1 (.NETCoreApp,VersioTCoreApp,Version=v2.1). Package SharpPulsar 0.8.4 supports: netstandard2.1 (.NETStandard,Version=v2.1)

my project is a dead simple lambda function:

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <TargetFramework>netcoreapp2.1</TargetFramework>
    <GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Amazon.Lambda.Core" Version="1.1.0" />
    <PackageReference Include="Amazon.Lambda.APIGatewayEvents" Version="1.2.0" />
    <PackageReference Include="Amazon.Lambda.Serialization.Json" Version="1.5.0" />
    <PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
    <PackageReference Include="SharpPulsar" Version="0.8.4" />
  </ItemGroup>
  
</Project>


from our twitter chat I am pretty sure you said it should work with netcore?

Thanks

RackawareEnsemblePlacementPolicyImpl - Failed to find 1 bookies

https://github.com/apache/pulsar

2022-12-17T15:33:19,171+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] WARN  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to find 1 bookies : excludeBookies [<Bookie:172.17.0.2:35485>], allBookies [<Bookie:172.17.0.2:35485>].
2022-12-17T15:33:19,171+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 228
2022-12-17T15:33:19,171+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] WARN  org.apache.bookkeeper.client.BookieWatcherImpl - New ensemble: [172.17.0.2:35485] is not adhering to Placement Policy
2022-12-17T15:33:19,173+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/__transaction_buffer_snapshot] [__compaction] Closed connection [id: 0x7454e801, L:/127.0.0.1:35034 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
2022-12-17T15:33:19,221+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [172.17.0.2:35485] for ledger: 2308
2022-12-17T15:33:19,245+0000 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot] Updated cursor __compaction with ledger id 2308 md-position=1948:163 rd-position=2259:12
2022-12-17T15:33:19,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot] reset readPosition to 71:1 before current read readPosition 2259:12 on cursor __compaction
2022-12-17T15:33:19,248+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] [persistent://public/default/__transaction_buffer_snapshot][__compaction] Reset subscription to message id 71:1
2022-12-17T15:33:19,248+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Successfully reset subscription to the message 71:1:-1:-1
2022-12-17T15:33:19,255+0000 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot][__compaction] Successfully closed & deleted ledger 2026 in cursor
2022-12-17T15:33:19,277+0000 [broker-client-shared-timer-31-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/__transaction_buffer_snapshot] [__compaction] Reconnecting after timeout
2022-12-17T15:33:19,278+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Subscribing to topic on cnx [id: 0x7454e801, L:/127.0.0.1:35034 - R:localhost/127.0.0.1:6650], consumerId 228
2022-12-17T15:33:19,279+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / __compaction
2022-12-17T15:33:19,280+0000 [pulsar-io-27-5] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot-__compaction] Rewind from 71:1 to 2259:0
2022-12-17T15:33:19,280+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Created subscription on topic persistent://public/default/__transaction_buffer_snapshot / __compaction
2022-12-17T15:33:19,280+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 228
2022-12-17T15:33:19,374+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Closing consumer: consumerId=228
2022-12-17T15:33:19,374+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/__transaction_buffer_snapshot, name=__compaction}, consumerId=228, consumerName=7364a, address=/127.0.0.1:35034}
2022-12-17T15:33:19,374+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Closed consumer, consumerId=228
2022-12-17T15:33:19,375+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot] [__compaction] Closed consumer
2022-12-17T15:33:24,154+0000 [ForkJoinPool.commonPool-worker-2] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Dec/2022:15:33:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2409 "-" "Pulsar-Java-v2.11.0.0-rc2" 5
2022-12-17T15:33:24,165+0000 [ForkJoinPool.commonPool-worker-2] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Dec/2022:15:33:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2409 "-" "Pulsar-Java-v2.11.0.0-rc2" 6
2022-12-17T15:33:24,202+0000 [ForkJoinPool.commonPool-worker-2] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Dec/2022:15:33:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2409 "-" "Pulsar-Java-v2.11.0.0-rc2" 30
2022-12-17T15:33:54,155+0000 [ForkJoinPool.commonPool-worker-2] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Dec/2022:15:33:54 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2409 "-" "Pulsar-Java-v2.11.0.0-rc2" 6
2022-12-17T15:33:54,166+0000 [ForkJoinPool.commonPool-worker-2] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Dec/2022:15:33:54 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2409 "-" "Pulsar-Java-v2.11.0.0-rc2" 6
2022-12-17T15:34:18,991+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Subscribing to topic on cnx [id: 0x7454e801, L:/127.0.0.1:35034 - R:localhost/127.0.0.1:6650], consumerId 229
2022-12-17T15:34:18,991+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / __compaction
2022-12-17T15:34:18,991+0000 [pulsar-io-27-5] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot-__compaction] Rewind from 2259:12 to 2259:12
2022-12-17T15:34:18,991+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Created subscription on topic persistent://public/default/__transaction_buffer_snapshot / __compaction
2022-12-17T15:34:18,992+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 229
2022-12-17T15:34:18,992+0000 [compaction-78-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Get topic last message Id
2022-12-17T15:34:19,005+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Successfully getLastMessageId 2259:22
2022-12-17T15:34:19,005+0000 [broker-client-shared-internal-executor-28-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Get topic last message Id
2022-12-17T15:34:19,039+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Successfully getLastMessageId 2259:22
2022-12-17T15:34:19,039+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://public/default/__transaction_buffer_snapshot, reading to 2259:22:-1
2022-12-17T15:34:19,063+0000 [compaction-78-1] WARN  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to find 1 bookies : excludeBookies [<Bookie:172.17.0.2:35485>], allBookies [<Bookie:172.17.0.2:35485>].
2022-12-17T15:34:19,064+0000 [compaction-78-1] WARN  org.apache.bookkeeper.client.BookieWatcherImpl - New ensemble: [172.17.0.2:35485] is not adhering to Placement Policy
2022-12-17T15:34:19,101+0000 [compaction-78-1] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [172.17.0.2:35485] for ledger: 2309
2022-12-17T15:34:19,101+0000 [compaction-78-1] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase two of compaction for persistent://public/default/__transaction_buffer_snapshot, from 71:1:-1:-1 to 2259:22:-1:-1, compacting 137 keys to ledger 2309
2022-12-17T15:34:19,101+0000 [compaction-78-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Seeking subscription to the message 71:1:-1:-1
2022-12-17T15:34:19,102+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://public/default/__transaction_buffer_snapshot, name=__compaction}, consumerId=229, consumerName=5d4e6, address=/127.0.0.1:35034}
2022-12-17T15:34:19,102+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/__transaction_buffer_snapshot, name=__compaction}, consumerId=229, consumerName=5d4e6, address=/127.0.0.1:35034}
2022-12-17T15:34:19,102+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
2022-12-17T15:34:19,102+0000 [BookKeeperClientWorker-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot] Initiate reset readPosition to 71:1 on cursor __compaction
2022-12-17T15:34:19,103+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 229
2022-12-17T15:34:19,103+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/__transaction_buffer_snapshot] [__compaction] Closed connection [id: 0x7454e801, L:/127.0.0.1:35034 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
2022-12-17T15:34:19,105+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot] reset readPosition to 71:1 before current read readPosition 2259:23 on cursor __compaction
2022-12-17T15:34:19,105+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] [persistent://public/default/__transaction_buffer_snapshot][__compaction] Reset subscription to message id 71:1
2022-12-17T15:34:19,105+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Successfully reset subscription to the message 71:1:-1:-1
2022-12-17T15:34:19,205+0000 [broker-client-shared-timer-31-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/__transaction_buffer_snapshot] [__compaction] Reconnecting after timeout
2022-12-17T15:34:19,206+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Subscribing to topic on cnx [id: 0x7454e801, L:/127.0.0.1:35034 - R:localhost/127.0.0.1:6650], consumerId 229
2022-12-17T15:34:19,206+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / __compaction
2022-12-17T15:34:19,207+0000 [pulsar-io-27-5] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/__transaction_buffer_snapshot-__compaction] Rewind from 71:1 to 2259:0
2022-12-17T15:34:19,207+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Created subscription on topic persistent://public/default/__transaction_buffer_snapshot / __compaction
2022-12-17T15:34:19,207+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 229
2022-12-17T15:34:19,251+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Closing consumer: consumerId=229
2022-12-17T15:34:19,251+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/__transaction_buffer_snapshot, name=__compaction}, consumerId=229, consumerName=5d4e6, address=/127.0.0.1:35034}
2022-12-17T15:34:19,252+0000 [pulsar-io-27-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:35034] Closed consumer, consumerId=229
2022-12-17T15:34:19,252+0000 [pulsar-io-27-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__transaction_buffer_snapshot] [__compaction] Closed consumer
2022-12-17T15:34:24,151+0000 [ForkJoinPool.commonPool-worker-2] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Dec/2022:15:34:24 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false&subscriptionBacklogSize=false&getEarliestTimeInBacklog=false HTTP/1.1" 200 2409 "-" "Pulsar-Java-v2.11.0.0-rc2" 3

Can SchemaGenerator be extracted?

Hi! I'm implementing schema for Pulsar.Client and came to the same solution as you - it's convenient to have schema generated on the fly. What do you think about extracting this file to separate repo/nuget? It can be improved as well, for example adding cache.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.