Git Product home page Git Product logo

proximo's Introduction

Proximo CircleCI

Proximo is a proxy for multiple different publish-subscribe queuing systems.

It is based on a GRPC interface definition, making it easy to create new client libraries. It already supports a number of popular queueing systems, and adding new ones is intended to be simple.

Goals

  • Expose multiple consumer (fan out) semantics where needed

  • Minimise overhead over direct use of a given queuing system

  • Allow configuration of the underlying queue system via runtime configuration of Proximo

  • Allow replacement of a queueing system with no change to the Proximo client applications

  • Enabling easy creation of client libraries for new languages (anything that has GRPC support)

Non goals

  • Exposing specific details of the underlying queue system via the client API

Server

This is the Proximo server implementation, written in Go

proximo server

Proximo client libraries

  • go - substrate - we recommend to use substrate to access proximo from go

API definition (protobuf)

protobuf definitions

Access Control

Access Control is supported using an optional config file, using the PROXIMO_ACL_CONFIG.

In this example, all clients can access the topics that start with products but only a client called product-writer has permission to to write to these topics.

default:
  roles: ["read-products"]
roles:
- id: "read-products"
  consume: ["products.*"]
- id: "write-products"
  publish: ["products.*"]
clients:
- id: "product-writer"
  secret: "$2y$10$2AzC3Z8L18cP.crFi.ZDsuFdbwrYu16Lnh8y7U1wMO3QPanYuwJIm" # pass is bcrypted hash of "password"
  roles: ["write-products"]

Add the token to the context, example:

sink, _ := proximo.NewAsyncMessageSink(proximo.AsyncMessageSinkConfig{
  Broker:   "localhost:6868",
  Topic:    "products",
  Insecure: true,
})

token := base64.StdEncoding.EncodeToString(fmt.Sprintf("%s:%s", "product-writer", "password"))
md := metadata.Pairs("Authorization", fmt.Sprintf("Bearer %s", token))
reqCtx := metadata.NewOutgoingContext(ctx, md)

sink.PublishMessage(reqCtx, &Message{Data: []byte("hello world")})

proximo's People

Contributors

achilleasa avatar george-angel avatar gobonoid avatar jakekeeys avatar mjgarton avatar monkatron95 avatar nesze avatar nick-jones avatar povilasv avatar sbuliarca avatar simonhdickson avatar speedycoder avatar thinktainer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

proximo's Issues

Consider callback based server/client implementation

Callbacks are strictly more powerful and don’t require unnecessary goroutines.
Also is easy to write correct code

Server implementation(untested)

type ServerV2 struct {
	Producer Producer

	Consumer Consumer
}
type MessageSender func(*Message) error

type Consumer interface {
	Init(topic, consumer string, sendMessage MessageSender) error
	ReceiveConfirmation(*Confirmation) error
	Close() error
}

type ConfirmationSender func(*Confirmation) error

type Producer interface {
	Init(topic string, publisher string, sendConfirmation ConfirmationSender) error
	ReceiveMessage(*Message) error
	Close() error
}

func (s *ServerV2) Consume(stream MessageSource_ConsumeServer) error {

	_, cancel := context.WithCancel(stream.Context())
	defer cancel()

	c := &consumer{Consumer: s.Consumer}
	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				return nil
			}
			if strings.HasSuffix(err.Error(), "context canceled") {
				return nil
			}

			return err
		}
		//todo handle returned errors
		go c.handleRequest(msg, stream)
	}
}

type consumer struct {
	started  *atomicBool
	Consumer Consumer
}

func (c *consumer) handleRequest(msg *ConsumerRequest, stream MessageSource_ConsumeServer) error {
	switch {
	case msg.GetStartRequest() != nil:

		if c.started.Get() {
			return ErrStartedTwice
		}
		sr := msg.GetStartRequest()

		sendMessage:= func(m *Message) error {
			err := stream.Send(m)
			if err != nil {
				return err
			}
			return nil
		}
		if err := c.Consumer.Init(sr.GetTopic(), sr.GetConsumer(), sendMessage); err != nil {
			return err
		}
		c.started.Set(true)

	case msg.GetConfirmation() != nil:

		if !c.started.Get() {
			return ErrInvalidConfirm
		}

		return c.Consumer.ReceiveConfirmation(msg.GetConfirmation())
	default:
		return ErrInvalidRequest
	}
	panic("impossible")
}

type publisher struct {
	started  *atomicBool
	Producer Producer
}

func (p *publisher) handleRequest(msg *PublisherRequest, stream MessageSink_PublishServer) error {
	switch {
	case msg.GetStartRequest() != nil:
		if p.started.Get() {
			return ErrStartedTwice
		}

		sr := msg.GetStartRequest()
		sendConfirmation := func(c *Confirmation) error {
			return stream.Send(c)
		}
		p.Producer.Init(sr.Topic, "", sendConfirmation)
		p.started.Set(true)
	case msg.GetMsg() != nil:
		if !p.started.Get() {
			return ErrNotConnected
		}
		return p.Producer.ReceiveMessage(msg.GetMsg())

	}
	return nil
}

func (s *ServerV2) Publish(stream MessageSink_PublishServer) error {

	_, cancel := context.WithCancel(stream.Context())
	defer cancel()
	p := &publisher{Producer: s.Producer}
	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				return err
			}
			if strings.HasSuffix(err.Error(), "context canceled") {
				return err
			}
			return err
		}
		//todo handle returned errors
		go p.handleRequest(msg, stream)
	}
}


type atomicBool struct{ flag int32 }

func (b *atomicBool) Set(value bool) {
	var i int32 = 0
	if value {
		i = 1
	}
	atomic.StoreInt32(&(b.flag), int32(i))
}

func (b *atomicBool) Get() bool {
	if atomic.LoadInt32(&(b.flag)) != 0 {
		return true
	}
	return false
}

and the backend(untested):

var _ = proximo.Consumer(&NatsHandlerConsumer{})
var _ = proximo.Producer(&NatsHandlerProducer{})

type NatsHandlerConsumer struct {
	Url string
	sub *nats.Subscription
}

func (n *NatsHandlerConsumer) Init(topic, consumer string, sendMessage proximo.MessageSender) error {
	conn, err := nats.Connect(n.Url)
	if err != nil {
		return err
	}
	//defer conn.Close()

	ch := make(chan *nats.Msg, 64) //TODO: make 64 configurable at startup time
	sub, err := conn.ChanSubscribe(topic, ch)
	if err != nil {
		return err
	}
	n.sub = sub

	for {
		select { // drop
		case m := <-ch:
			sendMessage(&proximo.Message{
				Data: m.Data,
				Id:   proximo.GenerateID(),
			})
		}
	}

	return nil
}

func (n *NatsHandlerConsumer) ReceiveConfirmation(c *proximo.Confirmation) error { return nil }
func (n *NatsHandlerConsumer) Close() error                                      { return n.sub.Unsubscribe() }

type NatsHandlerProducer struct {
	Url              string
	topic            string
	sendConfirmation proximo.ConfirmationSender
	conn             *nats.Conn
}

func (n *NatsHandlerProducer) Init(topic string, publisher string, sendConfirmation proximo.ConfirmationSender) error {
	var err error
	n.topic = topic
	n.conn, err = nats.Connect(n.Url)
	if err != nil {
		return err
	}

	return err
}

func (n *NatsHandlerProducer) ReceiveMessage(msg *proximo.Message) error {
	err := n.conn.Publish(n.topic, msg.GetData())
	n.sendConfirmation(&proximo.Confirmation{msg.GetId()})
	return err
}

func (n *NatsHandlerProducer) Close() error {
	n.conn.Close()
	return nil
}

Add release

It would be great going forward having a versioned release for this, so that go tooling can resolve to versions rather than hashes.

Proximo hangs when trying to publish a message

We have seen multiple instances when our producer tried to publish a message and got stuck until the program was restarted. This can be potentially resolved by adding a timeout to the call, but it might an issue with the library.

Make proximo-server a library

hi nice project!

I would be nice to turn the server into a library and export "handler" interface so new Backends can be added without hacking.

Proposal: First-class support for message keys

Intro

This issue aims to make a case for including message keys as part of proximo's contract. Many event-streaming platforms support message keys, and it seems as though including them as part of proximo can solve some problems whilst not exposing underlying client APIs.

The problem

I've noticed (at least internally at @utilitywarehouse), at least 2 teams that have had to roll their own proximo server implementations in order to cater for message key. This is usually done by using substrate's KeyFunc mechanism for publishing to Kafka. The implementations I see for this can be quite complicated or bring problems when deploying proximo.

  • The partner team has created a protoc plugin that uses options to generate methods to return the desired message key, muddying proto definitions & requiring a custom proximo implementation to obtain said keys.
  • The billing team has a type switch checking event types to determine which proto field to use as the desired message key (this one is particularly problematic, as it couples proximo to the actual proto event types, which is undesireable as it requires ensuring the proximo implementation contains all possible proto messages linked in the binary)
  • On customer-billing, we use proximo purely for consumption and instead write to kafka directly so we can have message keys.

I suspect many more teams have done the same when it comes to their message keys.

Proximo states that exposing underlying stream implementations is a non-goal, which is fair. However, message keys are becoming more common across different event stream providers:

  • Kafka (partition keys)
  • Google PubSub (ordering keys)
  • RabbitMQ (routing keys)
  • Azure Service Bus (partition keys)

Proposed solution

  1. Add a key field to the Message proto
message Message {
  bytes data = 1;
  string id = 2;
  bytes key = 3;
}
  1. Update proximo backends to use the key if the backend supports it, in the case of kafka, we can specify the keyfunc. Currently, NATS is the outlier here, as it does not do message keys. However, in this case the backend can just ignore that key. If someone wants to switch from Kafka to NATS they already need to consider the fact that message keys will no longer exist, so it feels like it isn't a concern for proximo.

Document ordering

Due to the nature of the API and the target use cases, proximo is not suited to work over queues that do not guarantee ordering, and applications that use it expect this.

This should be more clearly documented in the readme.

Authentication & Authorisation

Two use cases identified

  • Who is consuming (authentication)
  • Restrict who can publish (authorisation)

Authentication

As the number of consumers has grown & the ability to do so is not restricted we've been unable to keep track. When an issue arises or we have to coordinate changes (for example, as we've been making changes to the underlying nats streaming cluster) it's been helpful to chat with other teams/consumers.

An identifier could also be used in metrics (who's consuming/publishing the most) or logs (which client caused a repeatable server panic).

I played with 2 ideas for identifying a client

  1. Client ID. An identifying token over a gRPC TLS connection.
  2. Mutual TLS. Identify the client through the cert.

ACL

The main driver for authentication has been to implement ACL on the publishing side of things. We should be able to restrict who can publish back into the stream.

An example spike looks like...

func NewStreamServerInterceptor(auth authorisor) grpc.StreamServerInterceptor {
	return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
		newCtx, err := auth.Authorise(stream.Context(), info.FullMethod)
		if err != nil {
			return err
		}

		wrapped := grpc_middleware.WrapServerStream(stream)
		wrapped.WrappedContext = newCtx
		return handler(srv, wrapped)
	}
}

func (a *BasicAuthorisor) Authorise(ctx context.Context, method string) (context.Context, error) {
	if !a.SecureConsume && !a.SecurePublish {
		return ctx, nil
	}

	token, err := grpc_auth.AuthFromMD(ctx, "basic")
	if err != nil {
		return nil, err
	}

	client, err := a.clientFromToken(token)
	if err != nil {
		return ctx, grpc.Errorf(codes.Unauthenticated, "Unable to identifiy client with token `%s`", token)
	}

	// Client has been authenticated, that's enough to consume
	// Now check client is authorised to publish
	if a.SecurePublish && strings.Contains(strings.ToLower(method), "publish") && !client.acl.publish {
		return ctx, grpc.Errorf(codes.PermissionDenied, "Client `%s` has no permission to publish", token)
	}

	return ctx, nil
}

Generating dart grpc client

This project is really great. I have been thinking about a similar thing for ages

I was having a look at the go-client. It looks like there is no reason why we can't codegen other language clients from the protobuffers, apart from a little housekeeping in the generated code.

Are you open to other languages being done. We don't have to commit the generated code if you don't want.

Tests

There is a general lack of tests. At least the server code should have more test coverage.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.