Git Product home page Git Product logo

core's Introduction

Gazette Logo

Gazette Continuous Integration

GoDoc

Slack

Go Report Card

Overview

Gazette makes it easy to build platforms that flexibly mix SQL, batch, and millisecond-latency streaming processing paradigms. It enables teams, applications, and analysts to work from a common catalog of data in the way that's most convenient to them. Gazette's core abstraction is a "journal" -- a streaming append log that's represented using regular files in a BLOB store (i.e., S3).

The magic of this representation is that journals are simultaneously a low-latency data stream and a collection of immutable, organized files in cloud storage (aka, a data lake) -- a collection which can be directly integrated into familiar processing tools and SQL engines.

Atop the journal broker service, Gazette offers a powerful consumers framework for building streaming applications in Go. Gazette has served production use cases for nearly five years, with deployments scaled to millions of streamed records per second.

Where to Start

core's People

Contributors

adaynu avatar apesternikov avatar asonawalla avatar brennanrieger avatar czeltser avatar davidgmonical avatar ddowker avatar dependabot[bot] avatar dyaffe avatar elynnyap avatar evanryanlin avatar fgoncalveslr avatar hawka avatar jgraettinger avatar joshk0 avatar jshearer avatar julianvmodesto avatar kdelga avatar khe avatar mdibaiee avatar mharris717 avatar michaelschiff avatar pintohutch avatar psfried avatar rupertchen avatar saterus avatar snowzach avatar tylercunnion avatar williamhbaker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

core's Issues

Reported production panic in `Broker.Append`

Was likely during a shutdown of the broker.

{"err":"wrong route token\n","level":"warning","msg":"transaction failed (phase one)","time":"2018-01-25T04:36:19Z"}
2018/01/25 04:36:20 http: panic serving 100.105.131.159:42720: send on closed channel
goroutine 2083912 [running]:
net/http.(*conn).serve.func1(0xc42af46780)
    /usr/local/go/src/net/http/server.go:1697 +0xd0
panic(0xcc9a40, 0xeb1ff0)
    /usr/local/go/src/runtime/panic.go:491 +0x283
vendor/github.com/LiveRamp/gazette/journal.(*Broker).Append(...)
    /opt/build/src/vendor/github.com/LiveRamp/gazette/journal/broker.go:71
vendor/github.com/LiveRamp/gazette/journal.(*Replica).Append(0xc42b4db1c0, 0xc42030f185, 0x28, 0x7fdcc2aa0310, 0xc42261fc00, 0x14c2980, 0xc42a50ec90, 0xc421441ce0)
    /opt/build/src/vendor/github.com/LiveRamp/gazette/journal/replica.go:58 +0xa6
vendor/github.com/LiveRamp/gazette/gazette.(*Router).Append(0xc420262f20, 0xc42030f185, 0x28, 0x7fdcc2aa0310, 0xc42261fc00, 0x14c2980, 0xc42a50ec90, 0xc421441ce0)
    /opt/build/src/vendor/github.com/LiveRamp/gazette/gazette/router.go:157 +0x4ec
vendor/github.com/LiveRamp/gazette/gazette.(*WriteAPI).Write(0xc42027e310, 0x14c12c0, 0xc4224470a0, 0xc42a512500)
    /opt/build/src/vendor/github.com/LiveRamp/gazette/gazette/write_api.go:36 +0x203
vendor/github.com/LiveRamp/gazette/gazette.(*WriteAPI).Write-fm(0x14c12c0, 0xc4224470a0, 0xc42a512500)
    /opt/build/src/vendor/github.com/LiveRamp/gazette/gazette/write_api.go:21 +0x48
net/http.HandlerFunc.ServeHTTP(0xc42027e340, 0x14c12c0, 0xc4224470a0, 0xc42a512500)
    /usr/local/go/src/net/http/server.go:1918 +0x44
vendor/github.com/gorilla/mux.(*Router).ServeHTTP(0xc420127ea0, 0x14c12c0, 0xc4224470a0, 0xc42a512500)
    /opt/build/src/vendor/github.com/gorilla/mux/mux.go:114 +0xdc
net/http.serverHandler.ServeHTTP(0xc42013ea90, 0x14c12c0, 0xc4224470a0, 0xc427e98700)
    /usr/local/go/src/net/http/server.go:2619 +0xb4
net/http.(*conn).serve(0xc42af46780, 0x14c28c0, 0xc42373b740)
    /usr/local/go/src/net/http/server.go:1801 +0x71d
created by net/http.(*Server).Serve
    /usr/local/go/src/net/http/server.go:2720 +0x288

data-flow instrumentation

As an operator, I want to visualize the directed dataflows of my infrastructure.

As an operator, I want to understand how a consumer’s recovery of a shard log is progressing

As an operator, I want to visualize how far “behind” the write head my consumer is.

As an operator, I want to integrate a notion of “lag” with my monitoring & altering infrastructure

consensus: defensive watch reloads

We've observed a production bug where, during Etcd instability / network partition, the consensus allocator's RetryWatcher appears to miss / not receive expiration watch updates from Etcd. The symptoms are that the watcher continues to receive and patch ongoing updates into it's copy of the tree after it re-establishes a connection to Etcd, but a set of keys which have in actuality long-since expired, never have corresponding expiration events received & patched in.

What's odd about this is that the RetryWatcher implementation will perform a full tree reload on basically any kind of error, so this appears to be a fully silent omission (as a surfaced network error or Etcd index error would have triggered a reload).

We need a means to detect that this has happened, and trigger a full reload from Etcd in response to it.

spurious "failed to cancel shard lock"

Refine this ticket / fill in more details

We see these in production and they're annoying. The totally unproven suspicion is they're due to a race on shutdown, where:

  • A pod is recovering a shard (most likely) or is serving as primary.
  • The pod is instructed to terminate, and deletes its item locks.
  • During shard teardown, an error is encountered (eg, because context is cancelled).
  • The defer trap invokes Cancel which again tries to remove the item lock, and complains because its not there.

I did some spelunking on this but was unable to confirm. AFAICT we have noisy error logging on every path where this could happen, that I wasn't able to find in production, but it's a bit tricky because the affected pods have already been torn down / K8s has junked their logs.

Split from #43

fragment flush interval

We want to run a process on gazette fragments which we'd like to have include all fragments from a previous 24hr window (e.g. 12:00:00 AM PT-11:59:59 PM PT).

Today it's difficult to determine when to run the aforementioned process because there is no guarantee as to when a spool created prior to the 24h window will close and upload the remaining events from that window to cloud storage. Especially if there is high variability in the number of writes to the consumed topic, the timing could be highly variable (I believe). Currently the only way to make a guarantee that we have included all fragments from a given window is to read all fragments until we hit one from a timestamp after the window (and perhaps fail our process if no such fragment is read).

The goal would be to have a process (perhaps a cron) which submits a request to the broker to close its current spool (even if said spool is still relatively small) and then upload that spool to cloud storage. This should provide a much easier ability to guarantee we are processing all fragments from a previous time frame.

recoverylog: write handoff interaction with unread hints

We observed that after losing some data in a recovery log (believed to be due to an unrelated Etcd ops issue), an unexpected interaction between injected no-ops and remaining FSM hints occurs:

  • Playback is asked to take over the recovery log (by injecting a no-op at current write head)
  • Current write-head is reached, and playback injects a no-op.
  • However, some set of hinted operations have yet to be read (because journal history branched / they were lost).
  • FSM refuses to sequence the no-op because it's not hinted.
  • Playback tries again, injecting a no-op ad infinitum.

This ends up looking really confusing to the operator. Far better to bail out with a clear error.

Acceptance criteria:

  • If Playback reaches a point where it wishes to inject a hand-off (asked to become primary, and at current write head), and there are remaining FSM hints to read, then playback should exit with a "FSM has remaining unread hints" error.
    • This is in addition to the related check in makeLive.

comprehensive & unified config file

As an operator, I want a unified way of representing my cluster, environment, and preferences across the gazette & run-consumer binaries and gazctl tools that:

  • Allows me to fully describe my hierarchical journal namespace to capture finer-grained policies of journal handling (Split from #29).

  • Is well documented with commented examples, so that I can discover the various knobs that exist and how to use them.

  • Allows me to represent my environment, such as Etcd or Broker endpoint.

  • Is handled consistently across the entire gazette ecosystem (eg, I don't want two gazette tools to have two different flavors of configuration).

Dynamically resolve fragment stores at persist time

Fragments currently have a single BackingStore which is proposed and agreed upon without validation when the Fragment is initialized. When a Fragment is persisted the BackingStore is pulled from the Fragment and retried infinitely until the store accepts the fragment.

If a JournalSpec_Fragment had an invalid store as the first element of its Stores array any Fragments initialized from the spec will get stuck in a retry loop which will never succeed. If the spec is updated with a valid BackingStore any future Fragments created from that spec will be persisted correctly but existing Fragments which are stuck in the retry loop will never see the updated spec and never properly persist their data.

The Persister should resolve the BackingStore from the current zero index of the JournalSpec_Fragment.Stores. This way, if at some point an invalid store is added to a spec the Fragment will eventually make it to a valid backing store once an operator has fixed the spec.

Notes:

  • The comment on the BackingStore of a Fragment suggests that the field is empty until it is persisted. This comment is out of date and should be updated to reflect the current implementation, or we should update the implementation to make the comment correct. If the plan is to use the spec rather than using the BackingStore to determine where the Fragment is persisted it might be better to leave the comment and change the implementation.

  • This opens the door to some tricky scenarios where a Fragment is initialized with some spec which contains an invalid BackingStore and later that spec is updated to remove all backing stores from the spec. In this case the expected behavior is unclear but I propose that at that point the Fragment is dropped, as this is what would happen if the Fragment was created using the most up to date spec.

stream commits to backing cloud filesystem

As an operator, I want gazette to compress and stream local spools to their target cloud filesystem as they're written, so that slow cloud FS fragment writes are back-pressured to the client (and I don't overflow my disk).

This is currently worked-around with a dirty, dirty hack of not using compression if recovery-log is in the journal name.

Criteria:

  • Upon commit, committed data is optimistically compressed and streamed to the backing cloud FS.
  • Completing a spool requires only closing / renaming the destination cloud FS fragment.

broker resolution should work after member key is removed

During broker shutdown, a race is possible where an RPC is received after the broker's Allocate has completed / lease is removed, and before grpc.GracefulStop() completes.

Currently, resolution fails because it requires that the member key be present in the KeySpace.

Ideally, resolution would succeed, and the request would be proxied to a responsible broker.

recoverylog: exact offset tracking to enable deletions

As a storage owner, I want recoverylog hints to represent exact first/last byte offsets so that I can identify portions of a recoverylog which are no longer needed.

Criteria:

  • Segments of hints produced by recoverylog.Playback include exact begin / end offsets.

gazctl: shards prune

As an operator, I want to provide a shard label selector, and have journal fragments of matched ShardSpec recovery logs that are not required for recovering from any of a shard’s current hints, to be deleted.

(Make gRPC api for fetching current hints?)

recoverylog: refactor to V2 client

Recoverylog Player and Recorder should use the V2 client. Also:

  • Make detection of FSMHints vs recoverylog Journal issues more proactive. Play should fail as soon as it detects that hints cannot be satisfied, rather than waiting until signaled to complete playback.

  • Integration tests should use brokertest.Broker such that they always run. Rename to E2E tests.

faster item allocation hand-off

As an operator, I want brokers and consumers to converge to a stable item allocation more quickly during releases and production issues, to minimize the period of disruption.

Criteria:

  • Allocation is globally optimal (can we do this with etcd3 leases?) - item assignment changes as few times as possible.
  • New benchmark on journal handoff: able to re-balance N thousand journals across M replicas on a local stack.

Split from #43

faster commits (remove data-sync)

As an user, I want append responses and reads to be unblocked before data is synced to disk on each replica, so that latency is reduced.

(This relies on replication alone to prevent data loss, as a single machine can now lose committed data. For K8s setups, this is fine: the host OS page cache holds committed data & a crash of the process doesn't affect it; a hard crash of the host will also kick an ephemeral pod anyway).

Criteria:

  • Client append responses sent after data is successfully replicated but before it's synced to each disk.
  • Client reads unblock after replication, but before it's synced.
  • Behavior is configurable (globally, or per-journal).

improve our welcome mat

As a prospective user or curious lurker, I want the Gazette repo root page in GitHub to provide:

  • An overview of the Gazette broker.
  • The types of problems its good at solving
  • Introductory examples of how to use it (curl?)
  • An overview of the consumers framework is.
  • The types of problems its good at solving.
  • How it differs from other tech I may have heard of.
  • Links to a guide on writing an interesting example consumer app (remix public transit APIs?)
  • Links to architectural overview.

remove cloudstore

As a maintainer, I want to ditch cloudstore in favor of a community-maintained solution, as there's some apparent duplication of capabilities with other packages and I'd rather not maintain it if I can.

Criteria:

  • Remove cloudstore package.

Notes:
spf13/afero is one likely package, paired with SFTP / S3 / GCS drivers. Potentially we can upstream our work here.

v3.allocator: extended testing scenarios

We want cases which cover insufficient zone slots specifically, and generally better testing coverage over cases where the allocator is unable to meet desired replication of all items.

consistent usage of `context.Context`

As a maintainer, I want context to be the one-and-only mechanism for propagating deadlines and cancellation through gazette brokers and consumers, so I don't have to maintain various ad-hoc mechanisms used today.

Criteria:

  • Journal read, append, replication operations are plumbed with contexts presented by the http request.
  • Consumer playback, recording, and shard processing are all plumbed with contexts presented by the main() caller.

push/relabel: sparse & lazy solver

push/relabel has to instantiate two Arcs for each (Journal, Member) tuple. However, for large flow networks, most of these arcs are never even examined as most flow follows the "garden path" of previously used & prioritized arcs.

I suspect we can refactor push/relabel to drive an interface representation of a flow network, and that v3.allocator can then supply a sparse and lazy instance which capitalizes on domain knowledge to power cheap & efficient enumeration of graph structure (arcs and nodes).

Where that structure is in a zero-valued state, it shouldn't need to have a concrete tracked representation (like Arc, and possibly even Node). It would only be "realized" as an instantiated tracking structure when flow is pushed onto an arc. For some particular arc, if that never happens, it should have zero cost.

consensus: next-gen datamodel & watcher

Define V3 allocator datamodel, lifting heavily from allocParams but representing:

  • Metadata (Etcd root path, revision).
  • Members (custom deserializer, with interface for obtaining leader / follower limits, rack location, & lease ID).
  • Items (custom deserializer, with interface for fetching desired replication).
  • Assignments (item leader and followers).

Also define:

  • a mechanism to build and update a populated datamodel instance using a clientv3 watcher
  • a mechanism for providing a callback which is driven on changes.

recoverylog: less noisy FSM playback warnings

FSM warnings happen frequently as part of normal operation, even when playing back from hints produced by the current recorder.

They're confusing to operators, who want to see warnings only if something abnormal is going on (such as frequent hand-off of a shard).

recoverylog/playback: small seeks causing performance issues

Current behavior in playback is to seek the underlying RetryReader and discard the current buffer on all seeks.

A problem with this is that many small seeks can be satisfied trivially, by discarding some data from the buffer. Worse, while the AdjustedMark of RetryReader and buffer may be before the desired offset, the underlying RetryReader offset may be later - forcing it to discard and re-establish it's underlying reader.

For databases using write-ahead logging, this has been observed to be particularly problematic.

The fix is to update playback seeking to take into account the current buffer so as not to needlessly restart reads.

support range-requests of uncompressed fragments

As a Gazette consumer operator, I want the gazette client to detect when a cloud-backed fragment is uncompressed, and to issue a HTTP range request for the exact byte offset desired, so that recovery log playback doesn't have to read & discard lots of irrelevant preceding bytes to get to the desired fragment offset.

This should accelerate recoverylog playback significantly (3x). Edit: this may help, but is mitigated by the variable fragment size feature in v2. Investigate to ensure this is worth doing before tackling.

Acceptance criteria:

  • Brokers communicate compression / Content-Encoding of cloud-backed fragments to clients.
  • Clients use HTTP range requests to direct the storage server to seek the exact offset, rather than client reading the whole fragment and discarding bytes prior to the desired offset.

gazctl: journals read cmd

As an operator, I want to invoke gazctl with a label selector, like gazctl journals read -L topic=my-topic, and have it stream and emit journal writes as they arrive.

  • It should default to offset = -1 (the current write-head), but should be configurable.
  • It should write complete commits from each journal at-a-time, so that multiple journals may be read without individual messages being interleaved in the output.

As an operator, I would like to pass a -topic flag to gazctl in order to read at the topic granularity rather than just at the journal granularity.

recoverylog: identify obsolete log fragments

As a storage owner, I want to identify fragments of a recoverylog which are no longer needed, so I may delete them.

Criteria:

  • Library routine which, given a Fragment & exact-offset FSMHintsSegmentSet, makes a yea/nay determination of whether the Fragment is obsolete.
  • Factored-out routine of journal.IndexWatcher which generates Fragments given a cloud-fs and journal path.
  • Routine which maps over a Fragment source to emit those which are obsolete.

journal namespace configuration

As an operator, I want to provide a description of my hierarchical journal namespace to capture finer-grained policies of journal handling.

Criteria:

  • Ability to mount different storage backends (eg, S3 bucket and path prefix) to different portions of the journal namespace.
  • Ability to define a compression policy other than gzip (eg, none, zstd, snappy, etc).
  • Ability to define retention duration.
  • Ability to define required replication of the journal.
  • Minimal description (not required to type out every journal name; make use of hierarchy).

Vendor image

Maintain a "vendor image" that includes the base environment setup and the source for vendored Go dependencies. CI should use this image as a cache when building.

gazctl: journals prune

As an operator, I want to provide a label selector and have matching journal fragments older than the current configured retention be deleted across all configured fragment stores.

wire up RocksDB compaction TTL

We're now using a version of RocksDB which supports compaction TTLs, but have no way to turn this on from the Go side. It may require adding appropriate cgo extensions.

gorocks: update / investigate embedding rocksdb

Our gorocks is apparently broken on 18.04 for non-dockerized builds, which is breaking some developer workflows. We'll need to re-pin gorocks and RocksDB itself to unbreak.

While at it, investigate whether we should switch to gorock's embedded build.

consumer/client: preserve connections to _all_ endpoints

Currently, consumer/client.go maintains connections only to those endpoints explicitly listed under /consumer.Consumer/CurrentConsumerState, but not to the total set of endpoints currently serving as PRIMARY for some shard.

This is a bug, because it means a consumer pod which is currently shutting down (and awaiting another pod to recover it's shard), but still serving as primary for a shard, cannot be reached from a client. The client will instead return ErrNoReadyPartitionClient

improvements to cluster consensus

As a Gazette operator, I want broker & consumer cluster consensus:

  • To not rely on an un-maintained & unsupported version of the Etcd api (v2).
  • To operate principally from a shared Etcd prefix, so that I can easily identify & enumerate all running consumers.
  • To scale to many thousands of members and allocated items, so that we can create large #'s of partitions without concern to cluster health.
  • To shift item allocations in response to member or item changes with minimal churning (as few total re-assignments as possible; a globally optimum solution).
  • To allow for per-item variance in the required replication (eg, some items have replication of 3, others of 2 or 1).
  • To be "rack" aware, so that items are replicated properly across physical regions.
  • To achieve faster hand-off by writing item status immediately, rather than polling.
  • To mitigate "failed to cancel shard lock" warnings observed in production.
  • To enable dynamic detection of introduced topic & item partitions, so that running services can immediately begin writing to / reading from those partitions.

I also want to front-load any complex migration steps that will be required now, before I create yet more production consumer dependencies which will be difficult to migrate later.

dynamic topic partitioning

As a user, I want to create new topic partitions & have publishers/consumers dynamically react, so that I can expand topic partitioning without needing to compile & restart affected services.

Criteria:

  • Easy means to expand the partitions of a topic.
  • When expanded, running publishers and consumers quickly react to & start using the created topic partitions.

Split from #43

gazctl: journals / shards edit

As operator, i should be able to easily edit journals or shards which match a selector, and have them apply when saved or re-open my editor on failure.

Only items which actually changed should be updated in Etcd.

build tooling to run integration tests

As a dev, I want an automated means to run all integration tests against a local K8s stack.

Criteria:

  • One-command build, stage & run of existing integration tests.
  • One-command build, stage & run of stream-sum example.

journals: named offsets within journals

As an operator / application developer, I want to gazette to track & manage "checkpoints" of journals which I can refer to by a name of my choosing (rather than offset).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.