;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/message.lisp#2 $ (in-package "RABBITMQ") ;; MESSAGE.LISP ;; Nick Levine, Ravenbrook Limited, 2007-09-21 ;; ;; 1. INTRODUCTION ;; ;; The purpose of this document is to implement a lisp interface to ;; RabbitMQ messages. ;; ;; See Appendix C below for copyright and license. ;; 2. MESSAGE (defclass outgoing-message () ((body :accessor message-raw-body :initform nil) (properties :accessor message-properties :initform (new-basic-properties)))) (defun new-basic-properties () (let ((properties (new AMQP$BasicProperties.))) (setf (AMQP$BasicProperties.deliveryMode properties) (box-int 1) (AMQP$BasicProperties.priority properties) (box-int 0)) properties)) (defun new-message (&key timestamp) (let ((message (make-instance 'outgoing-message))) (setf (message-timestamp message) (case timestamp ((t) (get-current-time)) ((nil) 0) (otherwise timestamp)) (message-content-type message) nil) message)) (defmethod message-raw-body ((self QueueingConsumer$Delivery.)) (QueueingConsumer$Delivery.getBody self)) (defmethod message-envelope ((self QueueingConsumer$Delivery.)) (QueueingConsumer$Delivery.getEnvelope self)) (defmethod message-properties ((self QueueingConsumer$Delivery.)) (QueueingConsumer$Delivery.getProperties self)) ;; 3. PROPERTIES ;; ;; TBD (if anyone ever wants them): clusterId and headers (defun message-body (message) (case (message-content-type message) ((nil) nil) ((:string) (message-body-string message)) ((:octets) (message-body-data message)) (otherwise (message-raw-body message)))) (defun (setf message-body) (new-value message) (typecase new-value (null (setf (message-content-type message) nil (message-raw-body message) nil)) (string (setf (message-content-type message) :string (message-body-string message) new-value)) (simple-vector (setf (message-content-type message) :octets (message-body-data message) new-value)) (otherwise (setf (message-raw-body message) new-value)))) (defun message-body-size (message) (jlength (or (message-raw-body message) (return-from message-body-size 0)))) (defmacro def-message-property (accessor property) (let ((java-accessor (intern (format nil "~a~a" 'AMQP$BasicProperties property) *package*))) `(dspec:def (def-message-property ,accessor) (defun ,accessor (message) (,java-accessor (message-properties message))) (defun (setf ,accessor) (new-value message) (setf (,java-accessor (message-properties message)) new-value)) ',accessor))) ;; "The Basic class provides methods that support an industry-standard messaging model." (def-message-property message-id .messageId) (def-message-property message-application-id .appId) (def-message-property message-content-encoding .contentEncoding) (def-message-property raw-message-content-type .contentType) (def-message-property message-correlation-id .correlationId) (def-message-property message-expiration .expiration) (def-message-property message-reply-to .replyTo) (def-message-property message-type .type) (def-message-property message-user-id .userId) (defun message-origin (message) (format nil "~a/~a" (message-reply-to message) (message-id message))) (defun message-content-type (message) (let ((keyword (intern (raw-message-content-type message) :keyword))) (case keyword ((:|null|) nil) ((:|text/plain|) :string) ((:|application/octet-stream|) :octets) (otherwise keyword)))) (defun (setf message-content-type) (new-value message) (setf (raw-message-content-type message) (case new-value ((nil) "null") ((:string) "text/plain") ((:octets) "application/octet-stream") (otherwise (string-downcase new-value)))) new-value) (defun message-delivery-persistent (message) (= (unbox-int (AMQP$BasicProperties.deliveryMode (message-properties message))) 2)) (defun (setf message-delivery-persistent) (new-value message) (setf (AMQP$BasicProperties.deliveryMode (message-properties message)) (box-int (if new-value 2 1))) new-value) (defun message-priority (message) (unbox-int (AMQP$BasicProperties.priority (message-properties message)))) (defun (setf message-priority) (new-value message) (setf (AMQP$BasicProperties.priority (message-properties message)) (box-int new-value)) new-value) (defparameter *timestamp-offset* (encode-universal-time 0 0 0 1 1 1970)) (defun message-timestamp (message) (let ((date (or (AMQP$BasicProperties.timestamp (message-properties message)) (return-from message-timestamp nil)))) (+ (/ (float (Date.getTime date) 1d0) 1000) *timestamp-offset*))) (defun (setf message-timestamp) (new-value message) (let ((date (new Date.))) ;; Hack alert: JFLI constructors don't have any way to coerce arguments ;; to long, but luckily there's this workaround (Date.setTime date (round (* (- new-value *timestamp-offset*) 1000))) (setf (AMQP$BasicProperties.timestamp (message-properties message)) date)) new-value) #-linux (defun get-current-time () (get-universal-time)) #+linux (defun get-current-time () (let* ((system-time (system::get-time-from-70)) (seconds (+ (aref system-time 0) 2208988800)) ; 1970-01-01 (millis (round (/ (aref system-time 1) 1000)))) (+ seconds (* millis 1d-3)))) (defun message-exchange (message) (Envelope.getExchange (message-envelope message))) (defun message-routing-key (message) (Envelope.getRoutingKey (message-envelope message))) (defun message-delivery-tag (message) (Envelope.getDeliveryTag (message-envelope message))) ;; 4. METHODS (defun message-body-string (message) (Object.toString (new String. (or (message-raw-body message) (return-from message-body-string nil))))) (defmethod (setf message-body-string) (new-value (self outgoing-message)) (setf (message-raw-body self) (String.getBytes new-value)) new-value) (defun message-body-data (message &key (element-type t)) (let* ((body (or (message-raw-body message) (return-from message-body-data nil))) (length (jlength body)) (data (make-array length :element-type element-type))) (typecase data (simple-vector (dotimes (i length) (setf (svref data i) (jref-byte body i)))) (simple-string (dotimes (i length) (setf (schar data i) (code-char (let ((signed (jref-byte body i))) (if (< signed 0) (+ signed #x100) signed)))))) (t (dotimes (i length) (setf (aref data i) (jref-byte body i))))) data)) (defmethod (setf message-body-data) (new-value (self outgoing-message)) (let* ((length (length new-value)) (body (make-new-array :byte length))) (setf (message-raw-body self) body) (typecase new-value (simple-vector (dotimes (i length) (setf (jref-byte body i) (svref new-value i)))) (simple-string (dotimes (i length) (setf (jref-byte body i) (char-code (schar new-value i))))) (t (dotimes (i length) (setf (jref-byte body i) (aref new-value i)))))) new-value) (defun message-first-byte (message) (when-let (raw-body (message-raw-body message)) (when (plusp (jlength raw-body)) (unbox-byte (jref raw-body 0))))) (defun full-publish (message channel ticket exchange routing-key mandatory immediate) (with-alive-channel (channel) (Channel.basicPublish channel ticket exchange routing-key mandatory immediate (message-properties message) (message-raw-body message)))) (defun publish (message channel exchange routing-key) (full-publish message channel (channel-ticket channel) exchange routing-key nil nil)) (defun destroy-message (message) (declare (ignore message)) nil) (defun full-consume-queue (channel ticket queue consumer-tag no-local no-ack exclusive) (with-alive-channel (channel) (let ((consumer (new QueueingConsumer. channel))) (Channel.basicConsume channel ticket queue no-ack consumer-tag no-local exclusive consumer)))) ;; no-ack default to false, i.e. explicit acknowledgements are required (defun consume-queue (channel queue) (with-alive-channel (channel) (let ((consumer (new QueueingConsumer. channel))) ;; allow server to generate the consumerTag (Channel.basicConsume channel (channel-ticket channel) queue consumer)))) (defun full-cancel-queue (channel consumer-tag) (with-alive-channel (channel) (Channel.basicCancel channel consumer-tag))) (defun cancel-queue (channel) (full-cancel-queue channel (channel-consumer-tag channel))) (defun acknowledge-delivery (channel message) (let ((delivery-tag (message-delivery-tag message))) (channel.basicAck channel delivery-tag nil))) ;; A. REFERENCES ;; ;; ;; B. HISTORY ;; ;; 2007-09-21 NDL Created. ;; ;; ;; C. COPYRIGHT ;; ;; Copyright (c) 2007 Wiinz Limited. ;; ;; Permission is hereby granted, free of charge, to any person ;; obtaining a copy of this software and associated documentation ;; files (the "Software"), to deal in the Software without ;; restriction, including without limitation the rights to use, copy, ;; modify, merge, publish, distribute, sublicense, and/or sell copies ;; of the Software, and to permit persons to whom the Software is ;; furnished to do so, subject to the following conditions: ;; ;; The above copyright notice and this permission notice shall be ;; included in all copies or substantial portions of the Software. ;; ;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, ;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF ;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND ;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT ;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, ;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, ;; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER ;; DEALINGS IN THE SOFTWARE.