Git Product home page Git Product logo

spanner-event-exporter's Introduction

Spez

Spez == Spanner Poller Event-sourced Z

Spez is a library that will register a listner that will publish any record you write to a particular Spanner table as an Avro record on a pub / sub stream. This is a great foundation for creating Event Sourced Systems. Spez also provides a Cloud Function called "Archiver" that will be triggered by any write to a specific pub / sub topic and automatically write that record to a Google Cloud Storage bucket.

This is an example of how you might work with a spez record once it is on the pub / sub queue.

Example:

 try {
      final SubscriberStubSettings subscriberStubSettings =
          SubscriberStubSettings.newBuilder()
              .setTransportChannelProvider(
                  SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                      .setMaxInboundMessageSize(20 << 20) // 20MB
                      .build())
              .build();

      try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
        final String subscriptionName = ProjectSubscriptionName.format(PROJECT_ID, topicName);
        final PullRequest pullRequest =
            PullRequest.newBuilder()
                .setMaxMessages(20)
                .setReturnImmediately(true)
                .setSubscription(subscriptionName)
                .build();

        final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
        final DatumReader<GenericRecord> datumReader =
            new GenericDatumReader<GenericRecord>(avroSchema);

        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
          final JsonDecoder decoder =
              DecoderFactory.get()
                  .jsonDecoder(avroSchema, message.getMessage().getData().newInput());

          final GenericRecord record = datumReader.read(null, decoder);

          // ... Process Record
        }
      }
    } catch (IOException e) {
      log.error("Could not get record", e);

    }

Configuration

Application Config

Spez uses a config library that allows you to specify your config variables via a yaml(like) config file, or override them with command line options. The following variables are defined as the default configuration in the spez.conf file. The proper way to override these variables for this application is via the k8s configmap located in the kubernetes directory.

spez {
  avroNamespace="spez"
  instanceName="spez-poller-demo"
  dbName="spez_poller_db"
  tableName="spez_poller_table"
  pollRate=1000
  recordLimit="200"
  startingTimestamp="2019-03-06T01:29:25.500000Z"
  publishToPubSub=true
}

K8s configmap

These values are located in the kubernetes/configmap.yaml. You will need to modify these values to match your system configuration in order to properly use spez.

apiVersion: v1
kind: ConfigMap
metadata:
  name: spez-config
  namespace: default
data:
  GOOGLE_APPLICATION_CREDENTIALS: "/var/run/secret/cloud.google.com/service-account.json"
  AVRO_NAMESPACE: "spez"
  INSTANCE_NAME: "spez-poller-demo"
  DB_NAME: "spez_poller_db"
  TABLE_NAME: "spez_poller_table"
  POLL_RATE: "1000"
  RECORD_LIMIT: "200"
  STARTING_TIMESTAMP: "2019-03-06T01:29:25.500000Z"
  PUBLISH_TO_PUBSUB: true
  FUNCTION: "archiver"
  BUCKET_REGION: "us-west-1"
  BUCKET_NAME: "gs://spez_archive/spez_poller_table"

Create a Spanner Table

In order to use this poller you must have a column named Timestamp that is not null and contains the Spanner CommitTimestamp.

The poller will perform a full table scan on each poll interval. This will consume resources on your db instance. Typically to help with this, you would create a secondary index with the timestamp as the primary key. Do not do that as it will cause hotspots. In this case, you may want to instead increase the polling interval in order to address any excessive resource consumption on your instance.

Do not use a commit timestamp column as the first part of the primary key of a table or the first part of the primary key of a secondary index. Using a commit timestamp column as the first part of a primary key creates hotspots and reduces data performance, but performance issues may occur even with low write rates. There is no performance overhead to enable the commit timestamps on non-key columns that are not indexed.

Review this document for more information on sharding Spanner CommitTimestamps

Example:

CREATE TABLE spez_poller_table (
    ID INT64 NOT NULL,
    Color STRING(MAX),
    Name STRING(MAX),
    Timestamp TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
) PRIMARY KEY (ID)

Create a Service Account

In order to allow spez to interact with the necessary Google Cloud resources, you must create a service account for spez and give it the following permissions:

The spez application requires the following permissions:

  • Tha ability to read data from you spanner instance.
  • The ability to publish messages to a pub/sub topic.
  • The ability to write trace data to Stackdriver Trace.

Create a service account for the spez application:

export PROJECT_ID=$(gcloud config get-value core/project)
export SERVICE_ACCOUNT_NAME="spez-service-account"

## Create the service account
gcloud iam service-accounts create ${SERVICE_ACCOUNT_NAME} \
  --display-name "spez service account"

### Add the `spanner.databaseReader`, `pubsub.editor` and `cloudtrace.agent` IAM permissions to the spez service account:
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
  --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
  --role='roles/spanner.databaseReader'

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
  --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
  --role='roles/pubsub.editor'

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
  --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
  --role='roles/cloudtrace.agent'

### Generate and download the `spez` service account:
gcloud iam service-accounts keys create \
  --iam-account "${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \
  service-account.json

Deploying Spez

Spez is intended to be run on kubernetes. The configmap, deployment and service yaml are provided in the kubernetes/ directory.

# Deploy spez with the appropriate kubectl context
kubectl apply -f kubernetes/

Monitoring

Spez comes preconfigured to send the polling traces and metrics to Stackdriver (make sure you have the APIs enabled). For application logs, make sure you have your kubernetes cluster set up to recieve and publish logs sent to stdout and stderr.

Queues

In addition to the event poller, spez comes with a library to help process other types of asynchronous work called Queues. Queues is an integrated transactional messaging system that provides users with an easy way to perform asynchronous work at scale and even across multiple regions (when and where spanner is configured to do so). A Queue will continue to deliver a message until that message has been Ack'd and therefore is at least once delivery guarentee if there are multiple readers of the Queue.

Queues have the following properties:

  • Transactional: Applications are able to send and receive messages atomically within a Spanner transaction with other writes: queue operations are executed if and only if the entire transaction succeeds.
  • Durable: Messages successfully sent are not lost or dropped, and are guaranteed to be delivered.
  • Asynchronous: Messages are sent and received ("acked") in different transactions.
  • Triggering: Messages usually cause some receiver to immediately wake up for processing.

The unit of transmission and communication within a queue is called a Message. Each message represents a unit of work that the application needs to complete, and Spanner ensures that the work eventually gets successfully done. Messages are identified by a sender-chosen key (which must be unique) and contain a payload. Messages are sent within a transaction, and after commit, are delivered asynchronously to a Receiver. The Receiver then handles message processing and acks messages in a separate transaction.

When to use a Queue

Queues are appropriate whenever you have asynchronous work that needs to get done. For example you might need to...

  • Integrate with an external system. (e.g., ecommerce checkout, inventory management)
  • Defer expensive work such as large recomputation or generating a report.
  • Fan out updates across your database. (e.g., deliver calendar invites to many invitees)

Queues are not a particularly good fit if:

  • You have multiple Receivers / Subscribers that you need notified. (Use pubsub instead.)
  • You want to build long-lived, in-order change logs. (Use regular Spanner tables instead.)
  • You depend on strict realtime notifications. (Use a custom gRPC solution for this.)

Example:

Create a Queue

try {
    Queue.createQueue(dbAdminClient, dbId, queueName);
} catch (SpannerException e) {
    log.error("Could not create Queue table in spanner", e);
}

Send a Message

MyProto myProto = MyProto.newBuilder().setMessage("My-Message").build();

try {
  Queue.send(dbClient, queueName, "myKey", ByteArray.copyFrom(myProto.toByteArray()));
} catch (SpannerException e) {
  log.error("Could not write message to Queue", e);
}

Receive a Message

// Create the Async processing callback
QueueMessageCallback cb = messages -> {
    messages.forEach(m -> {
    Optional<MyProto> myProto = Optional.of(MyProto.parseFrom(m.value().asInputStream()));
      // ... Work with message
      Queue.ack(dbClient, m);
    });
}

// Register the receiver
Queue.recieve(dbClient, queueName, 20, 500, cb);

Publishing Options

By default, spez is configured to publish polled data to pub / sub. This allows you to take advantage of cloud function triggers in your event sourced ledger. This comes at the expense of additional cost and expense. If you do not need that functionality, you can set the "publishToPubSub" config option to false, and spez will publish all of your records to a queue named tableName + "_queue". This is simpler and more performant but could cause additional resource consumption on your spanner instance.

JMX Monitoring

For monitoring and debugging of the Spez poller, forward the JMX port (9010) to your local PC via kubectl and then open jconsole or jVisualVM:

kubectl port-forward <your-app-pod> 9010

## Open jconsole connection to your local port 9010:
jconsole 127.0.0.1:9010

spanner-event-exporter's People

Contributors

harwayne avatar xjdr avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.