@sap/xb-msg

XB Messaging

Downloads in past

Stats

StarsIssuesVersionUpdatedCreatedSize
@sap/xb-msg
0.9.122 years ago3 years agoMinified + gzip package size for @sap/xb-msg in KB

Readme

Message Streams
Provides a client for stream-based messaging.

Table of contents

Prerequisites

Make sure to have a message broker available, e.g. RabbitMQ installed locally, having the plugins for AMQP 1.0, MQTT 3.1.1 and WebSocket binding enabled.

Install

See also: https://www.npmjs.com/package/@sap/xb-msg
To add it to your project run:
npm i @sap/xb-msg

To generate complete API documentation run inside the library package folder
npm run doc

Overview

A messaging application shall focus on its business logic. Bindings and protocol-specific settings shall move to configuration, without negative impact on performance. The package provides a solution for that, wrapping protocol-specific client implementations from separate packages.
| Library | Protocol | |:----------------------|:----------------------------------------------------------------------------------| | @sap/xb-msg-amqp-v091 | AMQP v0.9.1 | | @sap/xb-msg-amqp-v100 | AMQP v1.0 | | @sap/xb-msg-mqtt-v311 | MQTT v3.1.1 |
Direct usage of the listed libraries provides full access to protocol-specific features. With @sap/xb-msg-amqp-091 the application may control channels, queues and exchanges. With @sap/xb-msg-amqp-100 the application may for example define sessions, attach links and control link credit dynamically. With @sap/xb-msg-mqtt-311 last-will-messages could be used.
However, many messaging applications will have less specific requirements. It may come down to:
  • Produce messages with a writable stream
  • Consume messages from a readable stream
  • Use messages with a binary payload
  • Receive message at least once and acknowledge messages after successful processing
  • Stay flexible with regards to the used messaging protocol
  • But still accept the need to maintain settings on the broker side, e.g. to create queues or assign topic bindings to it

With these requirements @sap/xb-msg in combination with package @sap/xb-msg-env can be a good choice. For example to consume messages:
const msg = require('@sap/xb-msg');
const env = require('@sap/xb-msg-env');

const options = env.msgClientOptions('my-service-instance', ['my-inp'], []);
const client = new msg.Client(options);

client.istream('my-inp')
    .on('error', () => {
        console.log(error);
    })
    .on('data', (message) => {
        console.log(`message: ${message.payload.toString()}`);
        message.done();
    })
;

client.connect();

@sap/xb-msg-env reads cf/xs environment variables to build up the client options (configuration). Switching the protocol, changing the binding to queues or topics or changing the quality of service becomes possible without changing the program code.
A message producer is implemented in a similar way:
const msg = require('@sap/xb-msg');
const env = require('@sap/xb-msg-env');

const options = env.msgClientOptions('my-service-instance', [], ['my-out']);
const client = new msg.Client(options);

client.ostream('my-out')
    .on('ready', () => {
        send();
    })
    .on('drain', () => {
        send();
    })
    .on('error', (error) => {
        console.log(error);
    })
;

client.connect();

Finally, using an own Transform stream (here class Converter) the application can also rely on fully automated flow control:
const msg = require('@sap/xb-msg');
const env = require('@sap/xb-msg-env');

const options = env.msgClientOptions('my-service-instance', ['my-inp'], ['my-out']);
const client = new msg.Client(options);

client.istream('my-inp')
    .pipe(new Converter())
    .pipe(client.ostream('my-out'))
;

client.connect();

Client Options

The client employs protocol-specific libraries that require protocol-specific settings, at least for stream definitions. For example, MQTT will accept a property qos whereas AMQP 1.0 accepts sndSettleMode and rcvSettleMode. However, these differences are restricted to the configuration. At runtime the application will always see a unified client behavior.

Connections

Based on client options one or more connections can be created, each for one of the supported protocols and each providing multiple incoming or outgoing streams. As soon as a client instance is connected the application can produce and/or consume messages.
Connections via 'net' and 'tls' are supported for all protocols. Websocket is specified and in consequence implemented for MQTT 3.1.1 and AMQP 1.0 only. Use options attribute 'wss' or 'ws'.
WebSocket connections can also be established using OAuth 2.0, for example when connecting a local application to SAP cloud. Relevant grant flows are: ClientCredentialsFlow and ResourceOwnerPasswordCredentialsFlow.
If multiple settings are provided the preference is as follows: preferred 'tls' then 'net' then 'wss' then finally 'ws'.

Message Streams

Writable and Readable streams are provided to handle outgoing and incoming messages. These streams always run in object mode.
Here, a single message is represented as a plain object with the following attributes:
  • source: defined by the incoming stream,
  • target: defined optionally by the application, similar to the source, accepted by the outgoing stream,
  • payload: message payload,
  • done(): a callback function to confirm final message processing,
    failed(error): a callback function to indicate processing failure.

A receiving application is expected to call either done or failed for each single message, exactly one time (maybe asynchronously) and independent from the used quality of service. A sending application can define the callbacks to get notified about the message state.
const message = {
    payload : Buffer.from('test'),
    done : () => this._onSendDone(message),
    failed : (error) => this._onSendFailed(error, message)
};
const noPause = stream.write(message);

In any case the application is expected to implement the flow control of writable streams correctly.

Message Payload

The application may provide the message payload for an outgoing message as follows:
  • a Buffer object,
  • an Array of Buffer objects,
  • a Payload object or a plain object with same properties as Payload.

Common properties of a Payload object (specific protocol clients add more data):
  • chunks: an Array of Buffer objects,
  • type: an optional string providing the content type (not supported with MQTT)

After the payload was given to a sender it must not be modified by the application anymore.
Incoming messages will always provide a Payload object, just for application convenience.

Examples

In folder examples there are test programs, ready to run if a broker can be reached locally at the protocol-specific default port. Folder examples/cfg provides sample configurations.