;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/channel.lisp#2 $ (in-package "RABBITMQ") ;; CHANNEL.LISP ;; Nick Levine, Ravenbrook Limited, 2007-09-20 ;; ;; 1. INTRODUCTION ;; ;; The purpose of this document is to implement a lisp interface to ;; RabbitMQ channels. ;; ;; See Appendix C below for copyright and license. ;; 2. OPEN & CLOSE (defun new-channel (connection) (with-alive-connection (connection) (change-class (make-typed-ref (Connection.createChannel connection)) 'channel))) (defmethod print-object ((self ChannelN.) stream) (print-unreadable-object (self stream :type t :identity t) (format stream "~a" (ChannelN.toString self)))) (defmacro with-alive-channel ((channel &key (if-dead :error)) &body body) (rebinding (channel) `(if (channel-alive ,channel) (progn ,@body) ,@(case if-dead ((:error) `((progn (channel-not-alive ,channel) ;; prevent tail call, aid debugging nil))))))) (defun channel-not-alive (channel) (error 'channel-not-alive :channel channel)) (define-condition channel-not-alive (error) ((channel :reader channel-not-alive-channel :initform nil :initarg :channel)) (:report (lambda (condition stream) (format stream "Channel~@[ ~a~] is no longer alive" (channel-not-alive-channel condition))))) (defmacro with-channel ((channel connection) &body body) (rebinding (connection) `(multiple-value-prog1 (let ((,channel (new-channel ,connection))) (unwind-protect (progn ,@body) (destroy-channel ,channel))) (check-connection-alive ,connection)))) (defun destroy-channel (channel &key (code 0) (message "closed by application")) (with-alive-channel (channel :if-dead nil) (handler-case (Channel.close channel code message) (channel-not-alive () ()))) channel) ;; 3. CHANNEL SUBCLASS (defclass channel (ChannelN.) ((the-consumer :accessor the-consumer :initform nil) (ticket :reader channel-ticket))) (defmethod shared-initialize :after ((self channel) slot-names &key) (declare (ignore slot-names)) (setf (slot-value self 'ticket) (channel.accessRequest self "/data"))) (defmethod the-consumer :around ((self channel)) (or (call-next-method) (setf (the-consumer self) (multiple-value-bind (size consumers) (channel-consumer-count self) (case size ((0) (error "~a has no consumers" self)) ((1)) (otherwise (error "~a has more than one consumer (~d)" self size))) (let ((tag (jref (Set.toArray (Map.keySet consumers)) 0))) (make-typed-ref (Map.get consumers tag))))))) (defmacro def-flush-the-consumer (fun) `(defadvice (,fun flush-the-consumer :after) (channel &rest args) (declare (ignore args)) (setf (the-consumer channel) nil))) (def-flush-the-consumer channel.basicConsume) (def-flush-the-consumer channelN.basicConsume) (def-flush-the-consumer channel.basicCancel) (def-flush-the-consumer channelN.basicCancel) ;; 4. MESSAGES (defun full-next-message (channel consumer nowait) (with-alive-channel (channel) (when nowait (when (consumer-empty-p consumer) (return-from full-next-message nil))) ;; TBD ?? -- shouldn't there be some waiting to do when nowait is ;; false? I no longer remember my intent, and this function ;; doesn't have a mirror in libamq so I can't peek at that -- NDL ;; 2007-09-28 (let ((delivery (make-typed-ref (QueueingConsumer.nextDelivery consumer)))) (acknowledge-delivery channel delivery) delivery))) (defmethod consumer-empty-p ((channel channel)) (consumer-empty-p (the-consumer channel))) (defmethod consumer-empty-p ((consumer QueueingConsumer.)) (zerop (consumer-arrived-count consumer))) (defvar *arrived-count* 0) (defun consumer-arrived-count (consumer) (incf *arrived-count*) (let ((queue (QueueingConsumer.getQueue consumer))) (Collection.size queue))) (defun next-message (channel) (full-next-message channel (the-consumer channel) t)) (defun channel-arrived-count (channel) (with-alive-channel (channel) (consumer-arrived-count (the-consumer channel)))) (defun channel-arrived-count-or-nil (channel) (let ((count (channel-arrived-count channel))) (when (plusp count) count))) (defun channel-wait (channel timeout) (when (zerop timeout) (error "~s called with zerop timeout. If you really wanted that, call ~s instead" 'channel-wait 'channel-wait-forever)) (with-alive-channel (channel) (safe-process-wait "Waiting for content" (/ timeout internal-time-units-per-second) (complement 'consumer-empty-p) channel)) (channel-arrived-count-or-nil channel)) (defun channel-wait-forever (channel) (with-alive-channel (channel) (safe-process-wait "Waiting for content" nil (complement 'consumer-empty-p) channel)) (channel-arrived-count-or-nil channel)) (defun safe-process-wait (reason timeout function &rest args) (let* ((value nil) (halt nil)) (mp:cached-process-run-function (format nil "Safe process wait for ~a" (mp:process-name mp:*current-process*)) nil (lambda () (loop until halt do (setf value (apply function args)) (sleep 0.01)))) (if timeout (mp:process-wait-with-timeout reason timeout (lambda () value)) (mp:process-wait reason (lambda () value))) (setf halt t) value)) ;; 5. PROPERTIES (defun channel-alive (channel) (ChannelN.isOpen channel)) (defun channel-consumer-count (channel) (let ((consumers (ChannelN._consumers channel))) (values (Map.size consumers) consumers))) (defun channel-consumer-tag (channel) (let ((consumer (the-consumer channel))) (QueueingConsumer.getConsumerTag consumer))) ;; 6. EXCHANGE (defmacro with-exchange-type ((type) &body body) `(let ((,type (ecase ,type ((:fanout) "fanout") ;; ignores routing-key ((:direct) "direct") ;; exact match on routing-key ((:topic) "topic") ;; pattern matching on routing-key ))) ,@body)) (defun full-declare-exchange (channel ticket exchange type passive durable auto-delete arguments) (with-alive-channel (channel) (make-typed-ref (channel.exchangeDeclare channel ticket exchange type passive durable auto-delete arguments)))) (defun declare-exchange (channel exchange type) (with-exchange-type (type) (full-declare-exchange channel (channel-ticket channel) exchange type nil nil nil nil))) (defun full-test-exchange (connection exchange type durable auto-delete arguments) (with-channel (channel connection) (trapping-not-found (channel.exchangeDeclare channel (channel-ticket channel) exchange type t durable auto-delete arguments) t))) (defun test-exchange (connection exchange type) (with-exchange-type (type) (full-test-exchange connection exchange type nil nil nil))) (defun full-delete-exchange (channel ticket exchange if-unused) (with-alive-channel (channel) (make-typed-ref (channel.exchangeDelete channel ticket exchange if-unused)))) (defun delete-exchange (channel exchange) (full-delete-exchange channel (channel-ticket channel) exchange nil)) ;; 7. QUEUE (defun full-declare-queue (channel ticket queue passive durable exclusive auto-delete arguments) (with-alive-channel (channel) (make-typed-ref (channel.queueDeclare channel ticket queue passive durable exclusive auto-delete arguments)))) (defun declare-queue (channel queue) (full-declare-queue channel (channel-ticket channel) queue nil nil nil nil nil)) (defun full-test-queue (connection queue durable exclusive auto-delete arguments) (with-channel (channel connection) (trapping-not-found (channel.queueDeclare channel (channel-ticket channel) queue t durable exclusive auto-delete arguments) t))) (defun test-queue (connection queue) (full-test-queue connection queue nil nil nil nil)) (defun full-delete-queue (channel ticket queue if-unused if-empty) (with-alive-channel (channel) (make-typed-ref (channel.queueDelete channel ticket queue if-unused if-empty)))) (defun delete-queue (channel queue) (full-delete-queue channel (channel-ticket channel) queue nil nil)) (defun full-bind-queue (channel ticket queue exchange routing-key arguments) (with-alive-channel (channel) (make-typed-ref (channel.queueBind channel ticket queue exchange routing-key arguments)))) (defun bind-queue (channel queue exchange routing-key) (full-bind-queue channel (channel-ticket channel) queue exchange routing-key nil)) ;; A. REFERENCES ;; ;; ;; B. HISTORY ;; ;; 2007-09-20 NDL Created. ;; ;; ;; C. COPYRIGHT ;; ;; Copyright (c) 2007 Wiinz Limited. ;; ;; Permission is hereby granted, free of charge, to any person ;; obtaining a copy of this software and associated documentation ;; files (the "Software"), to deal in the Software without ;; restriction, including without limitation the rights to use, copy, ;; modify, merge, publish, distribute, sublicense, and/or sell copies ;; of the Software, and to permit persons to whom the Software is ;; furnished to do so, subject to the following conditions: ;; ;; The above copyright notice and this permission notice shall be ;; included in all copies or substantial portions of the Software. ;; ;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, ;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF ;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND ;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT ;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, ;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, ;; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER ;; DEALINGS IN THE SOFTWARE.