Git Product home page Git Product logo

pulsar-client-go's Introduction

pulsar-client-go

A Go client library for the Apache Pulsar project.

GoDoc

Alternatives

The Pulsar project contains a Go client library that is a wrapper for the Pulsar C++ client library.

In comparison, this library is 100% Go (no cgo required). Outside the Go standard library, it has a single dependency on the golang/protbuf library.

Status & Goals

Status

This client is a work-in-progress and as such does not support all Pulsar features. It supports Pulsar 2.0 along with 1.22.

The following is an incomplete list of features that are not yet implemented:

  • Batch frame support
  • Payload compression support
  • Partitioned topics support
  • Athenz authentication support
  • Encryption support

Goals

  • 100% Go
  • Simplicity

Installation

go get -u github.com/Comcast/pulsar-client-go

Note: The package name is pulsar

Example

An example of a producer and consumer can be seen in the included cli application.

Contributions

Contributions are welcome. Please create an issue before beginning work on major contributions. Refer to the CONTRIBUTING.md doc for more information.

Local Development

Integration Tests

Integration tests are provided that connect to a Pulsar sever. They are best run against a local instance of Pulsar, since they expect the standalone properties to exist. See below for instructions on installing Pulsar locally.

Integration tests will be run when provided the pulsar flag with the address of the Pulsar server to connect to. Example:

go test -v -pulsar "localhost:6650"

Protobuf

The Makefile target api/PulsarApi.pb.go will generate the required .go files using the Pulsar source's .proto files.

Usage:

$ make api/PulsarApi.pb.go

Local Pulsar

Notes on installing Pulsar locally.

Prereqs:

  • For Java8 on OSX, use these instructions stackoverflow

  • Checkout source from github

    git clone [email protected]:apache/incubator-pulsar.git
  • Switch to desired tag, eg v1.22.1-incubating

  • Install Maven

    brew install maven
  • Compile (full instructions)

    mvn install -DskipTests

Launch Pulsar from Pulsar directory:

./bin/pulsar standalone --wipe-data --advertised-address localhost

Local Pulsar + TLS

The Makefile has various targets to support certificate generation, Pulsar TLS configuration, and topic setup:

  • Generate certificates for use by brokers, admin tool, and applications:

     make certificates

    This will create broker, admin, and app private/public pairs in the certs directory.

  • Generate configuration files for running Pulsar standalone and pulsar-admin with TLS enabled using generated certificates:

     make pulsar-tls-conf

    This will generate pulsar-conf/standalone.tls.conf and pulsar-conf/client.tls.conf files that can be used as the configurations for the standalone server and pulsar-admin tools respectively. They'll use the certificates in the certs directory. The files should be placed in the appropriate locations for use with those tools (probably the conf directory within the Pulsar directory). It's recommended to use symbolic-links to easily switch between configurations.

  • Setup sample topic on standalone server with TLS enabled:

     make standalone-tls-ns

    This will create a sample/standalone/ns1 topic. The app certificate will have produce, consume rights on the topic.

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

pulsar-client-go's People

Contributors

awilliams avatar davidwalter0 avatar merlimat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-client-go's Issues

user cluster host err

v0.3.1
use host: pulsar://10.57.98.208:6650,10.57.98.209:6650,10.57.98.210:6650,

happend

WARN[2510] [Failed to connect to broker.] error="dial tcp: address 192.168.33.3:6650,192.168.33.4:6650,192.168.33.5:6650: too many colons in address" remote_addr="pulsar://192.168.33.3:6650,192.168.33.4:6650,192.168.33.5:6650"

support kerberos authentication?

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

ManagedConsumer: queue size 1 behaves incorrectly

Expected behavior

When using a managed consumer with QueueSize == 1, eg

c := pulsar.ManagedConsumerConfig{
	QueueSize:           1,
}

Then it's expected that c.ReceiveAsync receive messages.

Actual behavior

The managed consumer incorrectly sets the highwater to 0, which incorrectly performs flow control.

System configuration

v0.0.1

Error after RedeliverUnacknowledged

Expected behavior

No error after RedeliverUnacknowledged

Actual behavior

had error

Steps to reproduce

package saga

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"github.com/leenux/pulsar-client-go"
	"github.com/stretchr/testify/suite"
	"go.uber.org/zap"
	"io/ioutil"
	"testing"
	"time"
)

var params = struct {
	pulsar        string
	tlsCert       string
	tlsKey        string
	tlsCA         string
	tlsSkipVerify bool
	name          string
	topic         string
	producer      bool
	message       string
	messageRate   time.Duration
	shared        bool
}{
	pulsar:        "localhost:6650",
	tlsCert:       "",
	tlsKey:        "",
	tlsCA:         "",
	tlsSkipVerify: false,
	name:          "demo",
	topic:         "persistent://sample/standalone/ns1/demo",
	producer:      false,
	message:       "--",
	messageRate:   time.Second,
	shared:        false,
}

var mcp *pulsar.ManagedClientPool
var mp *pulsar.ManagedProducer
var mc *pulsar.ManagedConsumer

type PulsarTestSuite struct {
	suite.Suite
}

func (suite *PulsarTestSuite) SetupTest() {

}

func (suite *PulsarTestSuite) TestReceive() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()
	_, err := mp.Send(ctx, []byte(time.Now().String()))
	if err != nil {
		zap.L().Error("Send",
			zap.Any("err", err),
		)
	}

	for i := 0; i < 1; i++ {
		message, err := mc.Receive(ctx)
		if err != nil && err.Error() != "context deadline exceeded" {
			zap.L().Error("Receive",
				zap.Any("err", err),
			)
			//break
		}
		zap.L().Debug("Receive",
			zap.Any("message", message),
			zap.Any("Payload", string(message.Payload)),
		)
	}
	zap.L().Debug("RedeliverUnacknowledged")
	err = mc.RedeliverUnacknowledged(ctx)
	if err != nil {
		zap.L().Error("RedeliverUnacknowledged",
			zap.Any("err", err),
		)
	}
	zap.L().Debug("Receive again")
	for {
		message, err := mc.Receive(ctx)
		if err != nil && err.Error() != "context deadline exceeded" {
			zap.L().Error("Receive2",
				zap.Any("err", err),
			)
			break
		}
		err = mc.Ack(ctx, message)
		if err != nil {
			zap.L().Error("Ack on Receive2",
				zap.Any("err", err),
			)
			break
		}
		zap.L().Debug("Receive2",
			zap.Any("message", message),
			zap.Any("Payload", string(message.Payload)),
		)
	}
}

func TestPulsarTestSuite(t *testing.T) {
	var logger *zap.Logger
	logger, _ = zap.NewDevelopment()
	defer logger.Sync()

	zap.ReplaceGlobals(logger)

	asyncErrs := make(chan error, 8)
	go func() {
		for err := range asyncErrs {
			zap.L().Error("asyncErrs",
				zap.Any("err", err),
			)
		}
	}()

	var tlsCfg *tls.Config
	if params.tlsCert != "" && params.tlsKey != "" {
		tlsCfg = &tls.Config{
			InsecureSkipVerify: params.tlsSkipVerify,
		}
		var err error
		cert, err := tls.LoadX509KeyPair(params.tlsCert, params.tlsKey)
		if err != nil {
			zap.L().Error("error loading certificates:",
				zap.Any("err", err),
			)
		}
		tlsCfg.Certificates = []tls.Certificate{cert}

		if params.tlsCA != "" {
			rootCA, err := ioutil.ReadFile(params.tlsCA)
			if err != nil {
				zap.L().Error("error loading certificate authority:",
					zap.Any("err", err),
				)
			}
			tlsCfg.RootCAs = x509.NewCertPool()
			tlsCfg.RootCAs.AppendCertsFromPEM(rootCA)
		}

		// Inspect certificate and print the CommonName attribute,
		// since this may be used for authorization
		if len(cert.Certificate[0]) > 0 {
			x509Cert, err := x509.ParseCertificate(cert.Certificate[0])
			if err != nil {
				zap.L().Error("error loading public certificate:",
					zap.Any("err", err),
				)
			}
			zap.L().Debug("Using certificate pair with CommonName=",
				zap.Any("x509Cert.Subject.CommonName", x509Cert.Subject.CommonName),
			)
		}
	}

	mcp = pulsar.NewManagedClientPool()
	mpCfg := pulsar.ManagedProducerConfig{
		Name:                  params.name,
		Topic:                 params.topic,
		NewProducerTimeout:    time.Second,
		InitialReconnectDelay: time.Second,
		MaxReconnectDelay:     time.Minute,
		ManagedClientConfig: pulsar.ManagedClientConfig{
			ClientConfig: pulsar.ClientConfig{
				Addr:      params.pulsar,
				TLSConfig: tlsCfg,
				Errs:      asyncErrs,
			},
		},
	}
	mcCfg := pulsar.ManagedConsumerConfig{
		Name:                  params.name,
		Topic:                 params.topic,
		Exclusive:             true,
		NewConsumerTimeout:    time.Second,
		InitialReconnectDelay: time.Second,
		MaxReconnectDelay:     time.Minute,
		ManagedClientConfig: pulsar.ManagedClientConfig{
			ClientConfig: pulsar.ClientConfig{
				Addr:      params.pulsar,
				TLSConfig: tlsCfg,
				Errs:      asyncErrs,
			},
		},
	}
	mp = pulsar.NewManagedProducer(mcp, mpCfg)
	mc = pulsar.NewManagedConsumer(mcp, mcCfg)
	// Allow consumer time to connect
	time.Sleep(time.Millisecond * 500)
	suite.Run(t, new(PulsarTestSuite))
}

output

=== RUN   TestPulsarTestSuite
--- PASS: TestPulsarTestSuite (1.51s)
=== RUN   TestPulsarTestSuite/TestReceive
2018-12-11T16:44:23.373+0800	DEBUG	saga/pulsar_test.go:71	Receive	{"message": {"Topic":"persistent://sample/standalone/ns1/demo","Msg":{"consumer_id":0,"message_id":{"ledgerId":1446,"entryId":17,"partition":-1}},"Meta":{"producer_name":"demo","sequence_id":0,"publish_time":1544517863000,"compression":0},"Payload":"MjAxOC0xMi0xMSAxNjo0NDoyMy4zNjE4OTkxMTkgKzA4MDAgQ1NUIG09KzAuNTEzNDc4ODU0"}, "Payload": "2018-12-11 16:44:23.361899119 +0800 CST m=+0.513478854"}
2018-12-11T16:44:23.373+0800	DEBUG	saga/pulsar_test.go:76	RedeliverUnacknowledged
2018-12-11T16:44:23.373+0800	DEBUG	saga/pulsar_test.go:83	Receive again
2018-12-11T16:44:23.378+0800	DEBUG	saga/pulsar_test.go:99	Receive2	{"message": {"Topic":"persistent://sample/standalone/ns1/demo","Msg":{"consumer_id":0,"message_id":{"ledgerId":1446,"entryId":17,"partition":-1}},"Meta":{"producer_name":"demo","sequence_id":0,"publish_time":1544517863000,"compression":0},"Payload":"MjAxOC0xMi0xMSAxNjo0NDoyMy4zNjE4OTkxMTkgKzA4MDAgQ1NUIG09KzAuNTEzNDc4ODU0"}, "Payload": "2018-12-11 16:44:23.361899119 +0800 CST m=+0.513478854"}
2018-12-11T16:44:24.363+0800	ERROR	saga/pulsar_test.go:94	Ack on Receive2	{"err": "proto: repeated field MessageId has nil element"}
...
...
Process finished with exit code 0

System configuration

Pulsar version: 2.2.0

Add ManagedConsumer timeoutAck and RedeliverUnacknowledged support plz

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.