Git Product home page Git Product logo

rxjs-websockets's Introduction

rxjs-websockets

build status

An rxjs websocket library with a simple and flexible implementation. Supports the browser and node.js.

Comparisons to other rxjs websocket libraries:

  • observable-socket
    • observable-socket provides an input subject for the user, rxjs-websockets allows the user to supply the input stream as a parameter to allow the user to select an observable with semantics appropriate for their own use case (queueing-subject can be used to achieve the same semantics as observable-socket).
    • With observable-socket the WebSocket object must be used and managed by the user, rxjs-websocket manages the WebSocket(s) for the user lazily according to subscriptions to the messages observable.
    • With observable-socket the WebSocket object must be observed using plain old events to detect the connection status, rxjs-websockets provides the connection status as an observable.
  • rxjs built-in websocket subject
    • Implemented as a Subject so lacks the flexibility that rxjs-websockets and observable-socket provide.
    • Does not provide any ability to monitor the web socket connection state.

Installation

Install the dependency:

npm install -S rxjs-websockets
# the following dependency is recommended for most users
npm install -S queueing-subject

Simple usage

import { QueueingSubject } from 'queueing-subject'
import websocketConnect from 'rxjs-websockets'

// this subject queues as necessary to ensure every message is delivered
const input = new QueueingSubject<string>()

// this method returns an object which contains two observables
const { messages, connectionStatus } = websocketConnect('ws://localhost/websocket-path', input)

// send data to the server
input.next('some data')

// the connectionStatus stream will provides the current number of websocket
// connections immediately to each new observer and updates as it changes
const connectionStatusSubscription = connectionStatus.subscribe(numberConnected => {
  console.log('number of connected websockets:', numberConnected)
})

// the websocket connection is created lazily when the messages observable is
// subscribed to
const messagesSubscription = messages.subscribe((message: string) => {
  console.log('received message:', message)
})

// this will close the websocket
messagesSubscription.unsubscribe()

// closing the websocket does not close the connection status observable, it
// can be used to monitor future connection status changes
connectionStatusSubscription.unsubscribe()

messages is a cold observable, this means the websocket connection is attempted lazily when a subscription is made to the messages observable. Advanced users of this library will find it important to understand the distinction between hot and cold observables, for most it will be sufficient to use the share operator as shown in the Angular example below.

Reconnecting on failure

This can be done with built-in rxjs operators:

const input = new QueueingSubject<string>()
const { messages, connectionStatus } = websocketConnect(`ws://server`, input)

// try to reconnect every second
messages.retryWhen(errors => errors.delay(1000)).subscribe(message => {
  console.log(message)
})

Alternate WebSocket implementations

A custom websocket factory function can be supplied that takes a URL and returns an object that is compatible with WebSocket:

const { messages } = websocketConnect(
  'ws://127.0.0.1:4201/ws',
  this.inputStream = new QueueingSubject<string>(),
  undefined,
  (url, protocols) => new WebSocket(url, protocols)
)

Protocols

The API typings follow which show how to use all features including protocols:

export interface Connection {
  connectionStatus: Observable<number>
  messages: Observable<string>
}

export interface IWebSocket {
  close(): any
  send(data: string | ArrayBuffer | Blob): any
  onopen?: (OpenEvent) => any
  onclose?: (CloseEvent) => any
  onmessage?: (MessageEvent) => any
  onerror?: (ErrorEvent) => any
}

export declare type WebSocketFactory = (url: string, protocols?: string | string[]) => IWebSocket

export default function connect(
  url: string,
  input: Observable<string>,
  protocols?: string | string[],
  websocketFactory?: WebSocketFactory
): Connection

JSON messages and responses

This example shows how to use the map operator to handle JSON encoding of outgoing messages and parsing of responses:

function jsonWebsocketConnect(url: string, input: Observable<object>, protocols?: string | string[]) {
  const jsonInput = input.map(message => JSON.stringify(message))
  const { connectionStatus, messages } = websocketConnect(url, jsonInput, protocols)
  const jsonMessages = messages.map(message => JSON.parse(message))
  return { connectionStatus, messages: jsonMessages }
}

Angular 4 example

The following is a very simple example Angular 4 service that uses rxjs-websockets to expose the messages from the server as an observable and take input messages using a procedural API. In most cases it would be preferable to wire the input stream up directly from one or more source observables.

// file: server-socket.service.ts
import { Injectable } from '@angular/core'
import { QueueingSubject } from 'queueing-subject'
import { Observable } from 'rxjs/Observable'
import websocketConnect from 'rxjs-websockets'
import 'rxjs/add/operator/share'

@Injectable()
export class ServerSocket {
  private inputStream: QueueingSubject<string>
  public messages: Observable<string>

  public connect() {
    if (this.messages)
      return

    // Using share() causes a single websocket to be created when the first
    // observer subscribes. This socket is shared with subsequent observers
    // and closed when the observer count falls to zero.
    this.messages = websocketConnect(
      'ws://127.0.0.1:4201/ws',
      this.inputStream = new QueueingSubject<string>()
    ).messages.share()
  }

  public send(message: string):void {
    // If the websocket is not connected then the QueueingSubject will ensure
    // that messages are queued and delivered when the websocket reconnects.
    // A regular Subject can be used to discard messages sent when the websocket
    // is disconnected.
    this.inputStream.next(message)
  }
}

This service could be used like this:

import { Component } from '@angular/core'
import { Subscription } from 'rxjs/Subscription'
import { ServerSocket } from './server-socket.service'

@Component({
  selector: 'socket-user',
  templateUrl: './socket-user.component.html',
  styleUrls: ['./socket-user.component.scss']
})
export class SocketUserComponent {
  private socketSubscription: Subscription

  constructor(private socket: ServerSocket) {}

  ngOnInit() {
    this.socket.connect()

    this.socketSubscription = this.socket.messages.subscribe((message: string) => {
      console.log('received message from server: ', message)
    })

    // send message to server, if the socket is not connected it will be sent
    // as soon as the connection becomes available thanks to QueueingSubject
    this.socket.send('hello')
  }

  ngOnDestroy() {
    this.socketSubscription.unsubscribe()
  }
}

rxjs-websockets's People

Contributors

cristianp6 avatar illusionalsagacity avatar insidewhy avatar nikolasleblanc avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.