Git Product home page Git Product logo

graphql-lambda-subscriptions's Introduction

Graphql Lambda Subscriptions

Release

Amazon Lambda Powered GraphQL Subscriptions. This is an Amazon Lambda Serverless equivalent to graphql-ws. It follows the graphql-ws prototcol. It is tested with the Architect Sandbox against graphql-ws directly and run in production today. For many applications graphql-lambda-subscriptions should do what graphql-ws does for you today without having to run a server. This started as fork of subscriptionless another library with similar goals.

As subscriptionless's tagline goes;

Have all the functionality of GraphQL subscriptions on a stateful server without the cost.

Why a fork?

I had different requirements and needed more features. This project wouldn't exist without subscriptionless and you should totally check it out.

Features

  • Only needs DynamoDB, API Gateway and Lambda (no app sync or other managed graphql platform required, can use step functions for ping/pong support)
  • Provides a Pub/Sub system to broadcast events to subscriptions
  • Provides hooks for the full lifecycle of a subscription
  • Type compatible with GraphQL and nexus.js
  • Optional Logging

Quick Start

Since there are many ways to deploy to amazon lambda I'm going to have to get opinionated in the quick start and pick Architect. graphql-lambda-subscriptions should work on Lambda regardless of your deployment and packaging framework. Take a look at the arc-basic-events mock used for integration testing for an example of using it with Architect.

API Docs

Can be found in our docs folder. You'll want to start with makeServer() and subscribe().

Setup

Create a graphql-lambda-subscriptions server

import { makeServer } from 'graphql-lambda-subscriptions'

// define a schema and create a configured DynamoDB instance from aws-sdk
// and make a schema with resolvers (maybe look at) '@graphql-tools/schema

const subscriptionServer = makeServer({
  dynamodb,
  schema,
})

Export the handler

export const handler = subscriptionServer.webSocketHandler

Configure API Gateway

Set up API Gateway to route WebSocket events to the exported handler.

๐Ÿ“– Architect Example
@app
basic-events

@ws
๐Ÿ“– Serverless Framework Example
functions:
  websocket:
    name: my-subscription-lambda
    handler: ./handler.handler
    events:
      - websocket:
          route: $connect
      - websocket:
          route: $disconnect
      - websocket:
          route: $default

Create DynamoDB tables for state

In-flight connections and subscriptions need to be persisted.

Changing DynamoDB table names

Use the tableNames argument to override the default table names.

const instance = makeServer({
  /* ... */
  tableNames: {
    connections: 'my_connections',
    subscriptions: 'my_subscriptions',
  },
})

// or use an async function to retrieve the names

const fetchTableNames = async () => {
  // do some work to get your table names
  return {
    connections,
    subscriptions,
  }
}
const instance = makeServer({
  /* ... */
  tableNames: fetchTableNames(),
})
๐Ÿ’พ Architect Example
@tables
Connection
  id *String
  ttl TTL
Subscription
  id *String
  ttl TTL

@indexes

Subscription
  connectionId *String
  name ConnectionIndex

Subscription
  topic *String
  name TopicIndex
import { tables as arcTables } from '@architect/functions'

const fetchTableNames = async () => {
  const tables = await arcTables()

  const ensureName = (table) => {
    const actualTableName = tables.name(table)
    if (!actualTableName) {
      throw new Error(`No table found for ${table}`)
    }
    return actualTableName
  }

  return {
    connections: ensureName('Connection'),
    subscriptions: ensureName('Subscription'),
  }
}

const subscriptionServer = makeServer({
  dynamodb: tables._db,
  schema,
  tableNames: fetchTableNames(),
})
๐Ÿ’พ Serverless Framework Example
resources:
  Resources:
    # Table for tracking connections
    connectionsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:provider.environment.CONNECTIONS_TABLE}
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
    # Table for tracking subscriptions
    subscriptionsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:provider.environment.SUBSCRIPTIONS_TABLE}
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
          - AttributeName: topic
            AttributeType: S
          - AttributeName: connectionId
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        GlobalSecondaryIndexes:
          - IndexName: ConnectionIndex
            KeySchema:
              - AttributeName: connectionId
                KeyType: HASH
            Projection:
              ProjectionType: ALL
            ProvisionedThroughput:
              ReadCapacityUnits: 1
              WriteCapacityUnits: 1
          - IndexName: TopicIndex
            KeySchema:
              - AttributeName: topic
                KeyType: HASH
            Projection:
              ProjectionType: ALL
            ProvisionedThroughput:
              ReadCapacityUnits: 1
              WriteCapacityUnits: 1
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
๐Ÿ’พ terraform example
resource "aws_dynamodb_table" "connections-table" {
  name           = "graphql_connections"
  billing_mode   = "PROVISIONED"
  read_capacity  = 1
  write_capacity = 1
  hash_key = "id"

  attribute {
    name = "id"
    type = "S"
  }
}

resource "aws_dynamodb_table" "subscriptions-table" {
  name           = "graphql_subscriptions"
  billing_mode   = "PROVISIONED"
  read_capacity  = 1
  write_capacity = 1
  hash_key = "id"

  attribute {
    name = "id"
    type = "S"
  }

  attribute {
    name = "topic"
    type = "S"
  }

  attribute {
    name = "connectionId"
    type = "S"
  }

  global_secondary_index {
    name               = "ConnectionIndex"
    hash_key           = "connectionId"
    write_capacity     = 1
    read_capacity      = 1
    projection_type    = "ALL"
  }

  global_secondary_index {
    name               = "TopicIndex"
    hash_key           = "topic"
    write_capacity     = 1
    read_capacity      = 1
    projection_type    = "ALL"
  }
}

PubSub

graphql-lambda-subscriptions uses it's own PubSub implementation.

Subscribing to Topics

Use the subscribe function to associate incoming subscriptions with a topic.

import { subscribe } from 'graphql-lambda-subscriptions'

export const resolver = {
  Subscribe: {
    mySubscription: {
      subscribe: subscribe('MY_TOPIC'),
      resolve: (event, args, context) => {/* ... */}
    }
  }
}
๐Ÿ“– Filtering events

Use the subscribe with SubscribeOptions to allow for filtering.

Note: If a function is provided, it will be called on subscription start and must return a serializable object.

import { subscribe } from 'graphql-lambda-subscriptions'

// Subscription agnostic filter
subscribe('MY_TOPIC', {
  filter: {
    attr1: '`attr1` must have this value',
    attr2: {
      attr3: 'Nested attributes work fine',
    },
  }
})

// Subscription specific filter
subscribe('MY_TOPIC',{
  filter: (root, args, context, info) => ({
    userId: args.userId,
  }),
})

Publishing events

Use the publish() function on your graphql-lambda-subscriptions server to publish events to active subscriptions. Payloads must be of type Record<string, any> so they can be filtered and stored.

subscriptionServer.publish({
  topic: 'MY_TOPIC',
  payload: {
    message: 'Hey!',
  },
})

Events can come from many sources

// SNS Event
export const snsHandler = (event) =>
  Promise.all(
    event.Records.map((r) =>
      subscriptionServer.publish({
        topic: r.Sns.TopicArn.substring(r.Sns.TopicArn.lastIndexOf(':') + 1), // Get topic name (e.g. "MY_TOPIC")
        payload: JSON.parse(r.Sns.Message),
      })
    )
  )

// Manual Invocation
export const invocationHandler = (payload) => subscriptionServer.publish({ topic: 'MY_TOPIC', payload })

Completing Subscriptions

Use the complete on your graphql-lambda-subscriptions server to complete active subscriptions. Payloads are optional and match against filters like events do.

subscriptionServer.complete({
  topic: 'MY_TOPIC',
  // optional payload
  payload: {
    message: 'Hey!',
  },
})

Context

Context is provided on the ServerArgs object when creating a server. The values are accessible in all callback and resolver functions (eg. resolve, filter, onAfterSubscribe, onSubscribe and onComplete).

Assuming no context argument is provided when creating the server, the default value is an object with connectionInitPayload, connectionId properties and the publish() and complete() functions. These properties are merged into a provided object or passed into a provided function.

Setting static context value

An object can be provided via the context attribute when calling makeServer.

const instance = makeServer({
  /* ... */
  context: {
    myAttr: 'hello',
  },
})

The default values (above) will be appended to this object prior to execution.

Setting dynamic context value

A function (optionally async) can be provided via the context attribute when calling makeServer.

The default context value is passed as an argument.

const instance = makeServer({
  /* ... */
  context: ({ connectionInitPayload }) => ({
    myAttr: 'hello',
    user: connectionInitPayload.user,
  }),
})

Using the context

export const resolver = {
  Subscribe: {
    mySubscription: {
      subscribe: subscribe('GREETINGS', {
        filter(_, _, context) {
          console.log(context.connectionId) // the connectionId
        },
        async onAfterSubscribe(_, _, { connectionId, publish }) {
          await publish('GREETINGS', { message: `HI from ${connectionId}!` })
        }
      })
      resolve: (event, args, context) => {
        console.log(context.connectionInitPayload) // payload from connection_init
        return event.payload.message
      },
    },
  },
}

Side effects

Side effect handlers can be declared on subscription fields to handle onSubscribe (start) and onComplete (stop) events.

๐Ÿ“– Adding side-effect handlers
export const resolver = {
  Subscribe: {
    mySubscription: {
      resolve: (event, args, context) => {
        /* ... */
      },
      subscribe: subscribe('MY_TOPIC', {
        // filter?: object | ((...args: SubscribeArgs) => object)
        // onSubscribe?: (...args: SubscribeArgs) => void | Promise<void>
        // onComplete?: (...args: SubscribeArgs) => void | Promise<void>
        // onAfterSubscribe?: (...args: SubscribeArgs) => PubSubEvent | Promise<PubSubEvent> | undefined | Promise<undefined>
      }),
    },
  },
}

Events

Global events can be provided when calling makeServer to track the execution cycle of the lambda.

๐Ÿ“– Connect (onConnect)

Called when a WebSocket connection is first established.

const instance = makeServer({
  /* ... */
  onConnect: ({ event }) => {
    /* */
  },
})
๐Ÿ“– Disconnect (onDisconnect)

Called when a WebSocket connection is disconnected.

const instance = makeServer({
  /* ... */
  onDisconnect: ({ event }) => {
    /* */
  },
})
๐Ÿ“– Authorization (connection_init)

onConnectionInit can be used to verify the connection_init payload prior to persistence.

Note: Any sensitive data in the incoming message should be removed at this stage.

const instance = makeServer({
  /* ... */
  onConnectionInit: ({ message }) => {
    const token = message.payload.token

    if (!myValidation(token)) {
      throw Error('Token validation failed')
    }

    // Prevent sensitive data from being written to DB
    return {
      ...message.payload,
      token: undefined,
    }
  },
})

By default, the (optionally parsed) payload will be accessible via context.

๐Ÿ“– Subscribe (onSubscribe)

Subscribe (onSubscribe)

Called when any subscription message is received.

const instance = makeServer({
  /* ... */
  onSubscribe: ({ event, message }) => {
    /* */
  },
})
๐Ÿ“– Complete (onComplete)

Called when any complete message is received.

const instance = makeServer({
  /* ... */
  onComplete: ({ event, message }) => {
    /* */
  },
})
๐Ÿ“– Error (onError)

Called when any error is encountered

const instance = makeServer({
  /* ... */
  onError: (error, context) => {
    /* */
  },
})

Caveats

Ping/Pong

For whatever reason, AWS API Gateway does not support WebSocket protocol level ping/pong. So you can use Step Functions to do this. See pingPong.

Socket idleness

API Gateway considers an idle connection to be one where no messages have been sent on the socket for a fixed duration (currently 10 minutes). The WebSocket spec has support for detecting idle connections (ping/pong) but API Gateway doesn't use it. This means, in the case where both parties are connected, and no message is sent on the socket for the defined duration (direction agnostic), API Gateway will close the socket. A fix for this is to set up immediate reconnection on the client side.

Socket Close Reasons

API Gateway doesn't support custom reasons or codes for WebSockets being closed. So the codes and reason strings wont match graphql-ws.

graphql-lambda-subscriptions's People

Contributors

dependabot[bot] avatar enisdenjo avatar izidormaklary avatar reconbot avatar renovate[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

graphql-lambda-subscriptions's Issues

The automated release is failing ๐Ÿšจ

๐Ÿšจ The automated release from the master branch failed. ๐Ÿšจ

I recommend you give this issue a high priority, so other packages depending on you can benefit from your bug fixes and new features again.

You can find below the list of errors reported by semantic-release. Each one of them has to be resolved in order to automatically publish your package. Iโ€™m sure you can fix this ๐Ÿ’ช.

Errors are usually caused by a misconfiguration or an authentication problem. With each error reported below you will find explanation and guidance to help you to resolve it.

Once all the errors are resolved, semantic-release will release your package the next time you push a commit to the master branch. You can also manually restart the failed CI job that runs semantic-release.

If you are not sure how to resolve this, here are some links that can help you:

If those donโ€™t help, or if this issue is reporting something you think isnโ€™t right, you can always ask the humans behind semantic-release.


No npm token specified.

An npm token must be created and set in the NPM_TOKEN environment variable on your CI environment.

Please make sure to create an npm token and to set it in the NPM_TOKEN environment variable on your CI environment. The token must allow to publish to the registry https://registry.npmjs.org/.


Good luck with your project โœจ

Your semantic-release bot ๐Ÿ“ฆ๐Ÿš€

It doesn't seem the Apollo client send `subscribe` via websocket event type

I am using this solution to setup graphql subscription in a lambda. But this code confused me:

https://github.com/reconbot/graphql-lambda-subscriptions/blob/main/lib/handleWebSocketEvent.ts#L44

When I test it by using ApolloClient in node, the client doesn't send a message type subscribe, instead it sends start. Is there anything wrong in the websocket event handler or should I change anything in client side to follow this message type?

Getting ValidationException: Invalid FilterExpression when publishing a big objects

I am getting "ValidationException: Invalid FilterExpression: Expression size has exceeded the maximum allowed size" when I try to publish a large object.

It looks like the issue is happing because of the filter logic, in getFilteredSubs.ts. Because I try to publish big a big object with nested objects inside the function creates a massive filter expression which results in the "Invalid FilterExpression: Expression size has exceeded" error.

In my use case, I don't need the filter nested logic, seems like a possible solution is to allow configuring to filter nested objects.
For example, I only need to filter by one argument (like userId).

Cannot read property 'definitions' of undefined

Hello, I'm trying to set up my WebSocket server however I'm getting this error on my CloudWatch logs for my lambda function:

Reason: TypeError: Cannot read property 'definitions' of undefined

{
    "errorType": "GoneException",
    "errorMessage": "410",
    "code": "GoneException",
    "message": "410",
    "statusCode": 410,
    "time": "2023-01-24T08:29:17.212Z",
    "requestId": "75e1aa0a-28c3-460a-a879-21271cd308a4",
    "retryable": false,
    "retryDelay": 69.89051258006047,
    "stack": [
        "GoneException: 410",
        "    at Object.extractError (/var/task/node_modules/aws-sdk/lib/protocol/json.js:52:27)",
        "    at Request.extractError (/var/task/node_modules/aws-sdk/lib/protocol/rest_json.js:61:8)",
        "    at Request.callListeners (/var/task/node_modules/aws-sdk/lib/sequential_executor.js:106:20)",
        "    at Request.emit (/var/task/node_modules/aws-sdk/lib/sequential_executor.js:78:10)",
        "    at Request.emit (/var/task/node_modules/aws-sdk/lib/request.js:686:14)",
        "    at Request.transition (/var/task/node_modules/aws-sdk/lib/request.js:22:10)",
        "    at AcceptorStateMachine.runTo (/var/task/node_modules/aws-sdk/lib/state_machine.js:14:12)",
        "    at /var/task/node_modules/aws-sdk/lib/state_machine.js:26:10",
        "    at Request.<anonymous> (/var/task/node_modules/aws-sdk/lib/request.js:38:9)",
        "    at Request.<anonymous> (/var/task/node_modules/aws-sdk/lib/request.js:688:12)",
        "    at Request.callListeners (/var/task/node_modules/aws-sdk/lib/sequential_executor.js:116:18)",
        "    at Request.emit (/var/task/node_modules/aws-sdk/lib/sequential_executor.js:78:10)",
        "    at Request.emit (/var/task/node_modules/aws-sdk/lib/request.js:686:14)",
        "    at Request.transition (/var/task/node_modules/aws-sdk/lib/request.js:22:10)",
        "    at AcceptorStateMachine.runTo (/var/task/node_modules/aws-sdk/lib/state_machine.js:14:12)",
        "    at /var/task/node_modules/aws-sdk/lib/state_machine.js:26:10",
        "    at Request.<anonymous> (/var/task/node_modules/aws-sdk/lib/request.js:38:9)",
        "    at Request.<anonymous> (/var/task/node_modules/aws-sdk/lib/request.js:688:12)",
        "    at Request.callListeners (/var/task/node_modules/aws-sdk/lib/sequential_executor.js:116:18)",
        "    at callNextListener (/var/task/node_modules/aws-sdk/lib/sequential_executor.js:96:12)",
        "    at IncomingMessage.onEnd (/var/task/node_modules/aws-sdk/lib/event_listeners.js:417:13)",
        "    at IncomingMessage.emit (events.js:412:35)"
    ]
}

Does anyone know how to fix this? I'm not sure where the error is coming from. This is the code for my handler

handler.js

const AWS = require('aws-sdk')
AWS.config.update({region: 'XXX'})
const dynamodb = new AWS.DynamoDB()
const { makeServer, subscribe } = require('graphql-lambda-subscriptions')
const schema = require("./../../lib/schema")
const createQueryContext = require("./../../lib/utils/createQueryContext")
const schemaPermissions = require("./../../lib/schema/security/schemaPermissions");
const { applyMiddleware } = require("graphql-middleware")

const subscriptionServer = makeServer({
    schema: applyMiddleware(schema, schemaPermissions),
    dynamodb,
    tableNames: {
        connections: 'XXX',
        subscriptions: 'XXX'
    },
    context: (event) => {
        const { connectionInitPayload, connectionId, publish, complete } = event
        console.log(event)
        if (connectionInitPayload.token) return createQueryContext({ Authorization: connectionInitPayload.token })
        else return null
    },
    onConnect: (event) => {
        console.log(event)
    },
    onDisconnect: (event) => {
        console.log(event)
    },
    onConnectionInit: async ({ event }) => {
        const { requestContext, body, isBase64Encoded } = event
        const { type, payload } = JSON.parse(body)
        let tPayload = JSON.parse(body)
        //! Validation needed here
        console.log(JSON.parse(body))
        if (tPayload?.payload?.headers?.Authorization) {
            return {
                token: tPayload?.payload?.headers?.Authorization
            }
        } else return { ...tPayload }
    },
    onError: (error, context) => {
        console.error('ERROR! - Websocket handler has encountered an error. Reason: ' + error)
        console.error(JSON.stringify(error))
    },
    onPing: (event, message) => {
        console.log(event)
    },
    onPong: (event, message) => {
        console.log(event)
    },
})

module.exports.handler = subscriptionServer.webSocketHandler

Do we only save one record per subscription?

Some of our code assumes a single subscription record, but subscribe will save one per topic subscribed. We currently don't support multiple topics for a subscription and I don't think I want to. This requires some thought and planning.

Publish doesn't execute resolvers

I've implemented the provided example (arc-basic-events), and am noticing an issue where the subscription payload doesn't resolve correctly and is sent to client(s) as null.

The cause seems to be in the publish function, which calls graphql.execute on the provided schema. The rootValue passed to execute() is actually the full publish event including topic, rather than just the payload.

Here's a minimal example:

type Subscription {
     example: String
}

await subscriptionServer.publish({ 
     topic: 'example',
     payload: { example: "Hello world!" },
});

In the above example, publish calls execute with a root value of the entire object provided to publish. As a result, graphql.execute never actually calls the resolver -- this makes sense since there's no data at 'example' to resolve.

Modifying publish to pass event.payload rather than the entire event fixes this issue, and resolves the value as expected.

I'm new to graphql so I may be misunderstanding. Is there a reason the entire event object is passed as rootValue?

Is there a way to skip Dynamodb stream?

I am looking at how to setup this application. When there is a need to publish data to subscribers, it saves the event on dynamodb which trigger a streaming event to another lambda. That lambda will publish the event to subscribers via graphql.

My question is why I have to save event on dynamodb. How can I skip this part? If I have received the event, it can just publish to subscribers directly?

Add subscription session storage

It would be really nice to maintain state between onSubscribe and onComplete, right now we persist args, connection info, and little else.

Nested attributes do not work when filtering events

Assume a filter:

filter: (_, _, context) => ({
    update: {
       userId: "foo"
    }
})

Which will be converted to the dynamodb query:

{
      TableName: "graphql_subscriptions",
      IndexName: "TopicIndex",
      ExpressionAttributeNames: {
        "#hashKey": "topic",
        "#filter": "filter",
        "#0": "update.userId",
      },
      ExpressionAttributeValues: {
        ":hashKey": "onUserBadgeCountChange",
        ":0": "foo",
      },
      KeyConditionExpression: "#hashKey = :hashKey",
      FilterExpression:  "(#filter.#0 = :0  OR attribute_not_exists(#filter.#0))",
}

Which is incorrect. As a result the filter will not work and all subscribers will receive the message.

Nested filters have to be written like this:

{
      TableName: "graphql_subscriptions",
      IndexName: "TopicIndex",
      ExpressionAttributeNames: {
        "#hashKey": "topic",
        "#filter": "filter",
        "#0": "update",
        "#1": "userId" // split the key by "."
      },
      ExpressionAttributeValues: {
        ":hashKey": "onUserBadgeCountChange",
        ":0": "foo",
      },
      KeyConditionExpression: "#hashKey = :hashKey",
      FilterExpression:  "(#filter.#0.#1 = :0  OR attribute_not_exists(#filter.#0.#1))", // Join the key back together with #0.#1
}

A very early draft of a solution inside getFilteredSubs.ts looks like this:

for (const [key, value] of Object.entries(flattenPayload)) {
  const aliasNumber = attributeCounter++;
  let subAliasNumbers = [];
  let subAliasNumber = aliasNumber;
  for (const part of key.split('.')) {
    expressionAttributeNames[`#${subAliasNumber}`] = part;
    subAliasNumbers.push(subAliasNumber);
    subAliasNumber++;
  }
  expressionAttributeValues[`:${aliasNumber}`] = value;
  filterExpressions.push(`(#filter.${subAliasNumbers.map(n => `#${n}`).join('.')} = :${aliasNumber} OR attribute_not_exists(#filter.${subAliasNumbers.map(n => `#${n}`).join('.')})`);
}

Unfortunetaly the docs are misleading as well - nested values are clearly described as "working fine" when not working at all.

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Warning

These dependencies are deprecated:

Datasource Name Replacement PR?
npm rollup-plugin-node-resolve Available

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Detected dependencies

github-actions
.github/workflows/release.yml
  • actions/checkout v3
  • actions/setup-node v3
.github/workflows/test.yml
  • actions/checkout v3
  • actions/setup-node v3
  • actions/checkout v3
  • actions/setup-node v3
nodenv
.node-version
  • node 16.16.0
npm
package.json
  • debug ^4.3.2
  • streaming-iterables ^7.0.0
  • @architect/sandbox 5.9.4
  • @graphql-tools/schema 10.0.5
  • @microsoft/api-extractor 7.47.6
  • @types/architect__sandbox 3.3.6
  • @types/aws-lambda 8.10.143
  • @types/chai 4.3.17
  • @types/chai-subset 1.3.5
  • @types/debug 4.1.12
  • @types/mocha 10.0.7
  • @types/node 18.19.44
  • @types/ws 8.5.12
  • @typescript-eslint/eslint-plugin 7.18.0
  • @typescript-eslint/parser 7.18.0
  • aggregate-error 5.0.0
  • aws-sdk 2.1674.0
  • chai 5.1.1
  • chai-subset 1.6.0
  • esbuild 0.23.0
  • esbuild-register 3.6.0
  • eslint 8.57.0
  • eslint-plugin-mocha-no-only 1.2.0
  • graphql 16.9.0
  • graphql-ws 5.16.0
  • inside-out-async 2.0.2
  • mocha 10.7.3
  • rollup 4.20.0
  • rollup-plugin-node-resolve 5.2.0
  • semantic-release 24.0.0
  • typedoc 0.25.13
  • typedoc-plugin-markdown 4.0.3
  • typescript 4.7.4
  • ws 8.18.0
  • aws-sdk >= 2.0.0
  • graphql >= 16.0.0
  • node ^14.13 || >=16

  • Check this box to trigger a request for Renovate to run again on this repository

Allow for an easy way to send an initial event

the onSubscribe callback is a great place to send out an event when someone subscribes to a topic. However, if you have a large topic it's desirable to only send an event to the connection that just subscribed. I'd like to add connectionId as something you can always filter on.

We currently have the indexes that could allow for this but the performance tradeoffs need to be properly evaluated.

How to Filter?

When need send message, how to filter the target to get data?
I only find follow code
async filter(root, args, context) { return { error: false, } },




for example, is there have like pubsub.asyncIterator(triggers).next()to find the target?

import { withFilter } from 'graphql-subscriptions'
subscribe: withFilter(
          () => pubsub.asyncIterator('NEW_MESSAGE'),
          (payload, variables) => {
        console.log('---------------messageFeed start-------------------')
        console.log('::Subscription subscribe payload ๅ‘ๆถˆๆฏๆ–น:%s \nvariables ่ฎข้˜…ๆ”ถๆถˆๆฏๆ–น:%s', payload, variables)

Then, When i get payload and variables data, i can compare if neet subscription message.

Please help me, how to filter to send message? Thanks!

When need send message, how to filter the target to get data?
I only find follow code:

export const resolver = {
  Subscribe: {
    mySubscription: {
      subscribe: subscribe('GREETINGS', {
        filter(_, _, context) {
          console.log(context.connectionId) // the connectionId
        },
        async onAfterSubscribe(_, _, { connectionId, publish }) {
          await publish('GREETINGS', { message: `HI from ${connectionId}!` })
        }
      })
      resolve: (event, args, context) => {
        console.log(context.connectionInitPayload) // payload from connection_init
        return event.payload.message
      },
    },
  },
}

When publishing data to subscribers, we need to make sure that each subscriber gets only the data it needs.
To do so, is there have some like withFilter helper from this package, which wraps AsyncIterator with a filter function, and lets you control each publication for each user.

for example, is there have like pubsub.asyncIterator(triggers).next()to find the target?

import { withFilter } from 'graphql-subscriptions';

const SOMETHING_CHANGED_TOPIC = 'something_changed';

export const resolvers = {
  Subscription: {
    somethingChanged: {
      subscribe: withFilter(() => pubsub.asyncIterator(SOMETHING_CHANGED_TOPIC), (payload, variables) => {
        return payload.somethingChanged.id === variables.relevantId;
      }),
    },
  },
}

Then, When i get payload and variables data, i can compare if neet subscription message.

Pub/Sub Improvments

We can use a topicKey that's indexed as most subscriptions will have at least one argument. (Eg. channel id, user id, etc) and then we can skip "filtering" for most publishing.

We can also add a topic as a sort key to the connectionId hashKey on subscriptions that allow us to target specific connections for initial events.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.