315 lines
8.7 KiB
JavaScript
315 lines
8.7 KiB
JavaScript
//
|
|
//
|
|
//
|
|
|
|
'use strict';
|
|
|
|
/*
|
|
The channel (promise) and callback APIs have similar signatures, and
|
|
in particular, both need AMQP fields prepared from the same arguments
|
|
and options. The arguments marshalling is done here. Each of the
|
|
procedures below takes arguments and options (the latter in an object)
|
|
particular to the operation it represents, and returns an object with
|
|
fields for handing to the encoder.
|
|
*/
|
|
|
|
// A number of AMQP methods have a table-typed field called
|
|
// `arguments`, that is intended to carry extension-specific
|
|
// values. RabbitMQ uses this in a number of places; e.g., to specify
|
|
// an 'alternate exchange'.
|
|
//
|
|
// Many of the methods in this API have an `options` argument, from
|
|
// which I take both values that have a default in AMQP (e.g.,
|
|
// autoDelete in QueueDeclare) *and* values that are specific to
|
|
// RabbitMQ (e.g., 'alternate-exchange'), which would normally be
|
|
// supplied in `arguments`. So that extensions I don't support yet can
|
|
// be used, I include `arguments` itself among the options.
|
|
//
|
|
// The upshot of this is that I often need to prepare an `arguments`
|
|
// value that has any values passed in `options.arguments` as well as
|
|
// any I've promoted to being options themselves. Since I don't want
|
|
// to mutate anything passed in, the general pattern is to create a
|
|
// fresh object with the `arguments` value given as its prototype; all
|
|
// fields in the supplied value will be serialised, as well as any I
|
|
// set on the fresh object. What I don't want to do, however, is set a
|
|
// field to undefined by copying possibly missing field values,
|
|
// because that will mask a value in the prototype.
|
|
//
|
|
// NB the `arguments` field already has a default value of `{}`, so
|
|
// there's no need to explicitly default it unless I'm setting values.
|
|
function setIfDefined(obj, prop, value) {
|
|
if (value != undefined) obj[prop] = value;
|
|
}
|
|
|
|
var EMPTY_OPTIONS = Object.freeze({});
|
|
|
|
var Args = {};
|
|
|
|
Args.assertQueue = function(queue, options) {
|
|
queue = queue || '';
|
|
options = options || EMPTY_OPTIONS;
|
|
|
|
var argt = Object.create(options.arguments || null);
|
|
setIfDefined(argt, 'x-expires', options.expires);
|
|
setIfDefined(argt, 'x-message-ttl', options.messageTtl);
|
|
setIfDefined(argt, 'x-dead-letter-exchange',
|
|
options.deadLetterExchange);
|
|
setIfDefined(argt, 'x-dead-letter-routing-key',
|
|
options.deadLetterRoutingKey);
|
|
setIfDefined(argt, 'x-max-length', options.maxLength);
|
|
setIfDefined(argt, 'x-max-priority', options.maxPriority);
|
|
setIfDefined(argt, 'x-overflow', options.overflow);
|
|
setIfDefined(argt, 'x-queue-mode', options.queueMode);
|
|
|
|
return {
|
|
queue: queue,
|
|
exclusive: !!options.exclusive,
|
|
durable: (options.durable === undefined) ? true : options.durable,
|
|
autoDelete: !!options.autoDelete,
|
|
arguments: argt,
|
|
passive: false,
|
|
// deprecated but we have to include it
|
|
ticket: 0,
|
|
nowait: false
|
|
};
|
|
};
|
|
|
|
Args.checkQueue = function(queue) {
|
|
return {
|
|
queue: queue,
|
|
passive: true, // switch to "completely different" mode
|
|
nowait: false,
|
|
durable: true, autoDelete: false, exclusive: false, // ignored
|
|
ticket: 0,
|
|
};
|
|
};
|
|
|
|
Args.deleteQueue = function(queue, options) {
|
|
options = options || EMPTY_OPTIONS;
|
|
return {
|
|
queue: queue,
|
|
ifUnused: !!options.ifUnused,
|
|
ifEmpty: !!options.ifEmpty,
|
|
ticket: 0, nowait: false
|
|
};
|
|
};
|
|
|
|
Args.purgeQueue = function(queue) {
|
|
return {
|
|
queue: queue,
|
|
ticket: 0, nowait: false
|
|
};
|
|
};
|
|
|
|
Args.bindQueue = function(queue, source, pattern, argt) {
|
|
return {
|
|
queue: queue,
|
|
exchange: source,
|
|
routingKey: pattern,
|
|
arguments: argt,
|
|
ticket: 0, nowait: false
|
|
};
|
|
};
|
|
|
|
Args.unbindQueue = function(queue, source, pattern, argt) {
|
|
return {
|
|
queue: queue,
|
|
exchange: source,
|
|
routingKey: pattern,
|
|
arguments: argt,
|
|
ticket: 0, nowait: false
|
|
};
|
|
};
|
|
|
|
Args.assertExchange = function(exchange, type, options) {
|
|
options = options || EMPTY_OPTIONS;
|
|
var argt = Object.create(options.arguments || null);
|
|
setIfDefined(argt, 'alternate-exchange', options.alternateExchange);
|
|
return {
|
|
exchange: exchange,
|
|
ticket: 0,
|
|
type: type,
|
|
passive: false,
|
|
durable: (options.durable === undefined) ? true : options.durable,
|
|
autoDelete: !!options.autoDelete,
|
|
internal: !!options.internal,
|
|
nowait: false,
|
|
arguments: argt
|
|
};
|
|
};
|
|
|
|
Args.checkExchange = function(exchange) {
|
|
return {
|
|
exchange: exchange,
|
|
passive: true, // switch to 'may as well be another method' mode
|
|
nowait: false,
|
|
// ff are ignored
|
|
durable: true, internal: false, type: '', autoDelete: false,
|
|
ticket: 0
|
|
};
|
|
};
|
|
|
|
Args.deleteExchange = function(exchange, options) {
|
|
options = options || EMPTY_OPTIONS;
|
|
return {
|
|
exchange: exchange,
|
|
ifUnused: !!options.ifUnused,
|
|
ticket: 0, nowait: false
|
|
};
|
|
};
|
|
|
|
Args.bindExchange = function(dest, source, pattern, argt) {
|
|
return {
|
|
source: source,
|
|
destination: dest,
|
|
routingKey: pattern,
|
|
arguments: argt,
|
|
ticket: 0, nowait: false
|
|
};
|
|
};
|
|
|
|
Args.unbindExchange = function(dest, source, pattern, argt) {
|
|
return {
|
|
source: source,
|
|
destination: dest,
|
|
routingKey: pattern,
|
|
arguments: argt,
|
|
ticket: 0, nowait: false
|
|
};
|
|
};
|
|
|
|
// It's convenient to construct the properties and the method fields
|
|
// at the same time, since in the APIs, values for both can appear in
|
|
// `options`. Since the property or mthod field names don't overlap, I
|
|
// just return one big object that can be used for both purposes, and
|
|
// the encoder will pick out what it wants.
|
|
Args.publish = function(exchange, routingKey, options) {
|
|
options = options || EMPTY_OPTIONS;
|
|
|
|
// The CC and BCC fields expect an array of "longstr", which would
|
|
// normally be buffer values in JavaScript; however, since a field
|
|
// array (or table) cannot have shortstr values, the codec will
|
|
// encode all strings as longstrs anyway.
|
|
function convertCC(cc) {
|
|
if (cc === undefined) {
|
|
return undefined;
|
|
}
|
|
else if (Array.isArray(cc)) {
|
|
return cc.map(String);
|
|
}
|
|
else return [String(cc)];
|
|
}
|
|
|
|
var headers = Object.create(options.headers || null);
|
|
setIfDefined(headers, 'CC', convertCC(options.CC));
|
|
setIfDefined(headers, 'BCC', convertCC(options.BCC));
|
|
|
|
var deliveryMode; // undefined will default to 1 (non-persistent)
|
|
|
|
// Previously I overloaded deliveryMode be a boolean meaning
|
|
// 'persistent or not'; better is to name this option for what it
|
|
// is, but I need to have backwards compatibility for applications
|
|
// that either supply a numeric or boolean value.
|
|
if (options.persistent !== undefined)
|
|
deliveryMode = (options.persistent) ? 2 : 1;
|
|
else if (typeof options.deliveryMode === 'number')
|
|
deliveryMode = options.deliveryMode;
|
|
else if (options.deliveryMode) // is supplied and truthy
|
|
deliveryMode = 2;
|
|
|
|
var expiration = options.expiration;
|
|
if (expiration !== undefined) expiration = expiration.toString();
|
|
|
|
return {
|
|
// method fields
|
|
exchange: exchange,
|
|
routingKey: routingKey,
|
|
mandatory: !!options.mandatory,
|
|
immediate: false, // RabbitMQ doesn't implement this any more
|
|
ticket: undefined,
|
|
// properties
|
|
contentType: options.contentType,
|
|
contentEncoding: options.contentEncoding,
|
|
headers: headers,
|
|
deliveryMode: deliveryMode,
|
|
priority: options.priority,
|
|
correlationId: options.correlationId,
|
|
replyTo: options.replyTo,
|
|
expiration: expiration,
|
|
messageId: options.messageId,
|
|
timestamp: options.timestamp,
|
|
type: options.type,
|
|
userId: options.userId,
|
|
appId: options.appId,
|
|
clusterId: undefined
|
|
};
|
|
};
|
|
|
|
Args.consume = function(queue, options) {
|
|
options = options || EMPTY_OPTIONS;
|
|
var argt = Object.create(options.arguments || null);
|
|
setIfDefined(argt, 'x-priority', options.priority);
|
|
return {
|
|
ticket: 0,
|
|
queue: queue,
|
|
consumerTag: options.consumerTag || '',
|
|
noLocal: !!options.noLocal,
|
|
noAck: !!options.noAck,
|
|
exclusive: !!options.exclusive,
|
|
nowait: false,
|
|
arguments: argt
|
|
};
|
|
};
|
|
|
|
Args.cancel = function(consumerTag) {
|
|
return {
|
|
consumerTag: consumerTag,
|
|
nowait: false
|
|
};
|
|
};
|
|
|
|
Args.get = function(queue, options) {
|
|
options = options || EMPTY_OPTIONS;
|
|
return {
|
|
ticket: 0,
|
|
queue: queue,
|
|
noAck: !!options.noAck
|
|
};
|
|
};
|
|
|
|
Args.ack = function(tag, allUpTo) {
|
|
return {
|
|
deliveryTag: tag,
|
|
multiple: !!allUpTo
|
|
};
|
|
};
|
|
|
|
Args.nack = function(tag, allUpTo, requeue) {
|
|
return {
|
|
deliveryTag: tag,
|
|
multiple: !!allUpTo,
|
|
requeue: (requeue === undefined) ? true : requeue
|
|
};
|
|
};
|
|
|
|
Args.reject = function(tag, requeue) {
|
|
return {
|
|
deliveryTag: tag,
|
|
requeue: (requeue === undefined) ? true : requeue
|
|
};
|
|
};
|
|
|
|
Args.prefetch = function(count, global) {
|
|
return {
|
|
prefetchCount: count || 0,
|
|
prefetchSize: 0,
|
|
global: !!global
|
|
};
|
|
};
|
|
|
|
Args.recover = function() {
|
|
return {requeue: true};
|
|
};
|
|
|
|
module.exports = Object.freeze(Args);
|