Object
Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
A new Client object can be initialized using two forms:
Standard positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 35 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) # Parse stomp:// URL's or set params if login.is_a?(Hash) @parameters = login first_host = @parameters[:hosts][0] @login = first_host[:login] @passcode = first_host[:passcode] @host = first_host[:host] @port = first_host[:port] || Connection::default_port(first_host[:ssl]) @reliable = true elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port @login = $2 || "" @passcode = $3 || "" @host = $4 @port = $5.to_i @reliable = false elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param first_host = {} first_host[:ssl] = !$2.nil? @login = first_host[:login] = $4 || "" @passcode = first_host[:passcode] = $5 || "" @host = first_host[:host] = $6 @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl]) options = $16 || "" parts = options.split(/&|=/) options = Hash[*parts] hosts = [first_host] + parse_hosts(login) @parameters = {} @parameters[:hosts] = hosts @parameters.merge! filter_options(options) @reliable = true else @login = login @passcode = passcode @host = host @port = port.to_i @reliable = reliable end check_arguments! @id_mutex = Mutex.new @ids = 1 if @parameters @connection = Connection.new(@parameters) else @connection = Connection.new(@login, @passcode, @host, @port, @reliable) end start_listeners end
Syntactic sugar for 'Client.new' See 'initialize' for usage.
# File lib/stomp/client.rb, line 101 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end
Abort a transaction by name
# File lib/stomp/client.rb, line 117 def abort(name, headers = {}) @connection.abort(name, headers) # lets replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| if listener = @listeners[message.headers['destination']] listener.call(message) end end end end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 158 def acknowledge(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.ack message.headers['message-id'], headers end
Begin a transaction by name
# File lib/stomp/client.rb, line 112 def begin(name, headers = {}) @connection.begin(name, headers) end
Close out resources in use by this client
# File lib/stomp/client.rb, line 222 def close headers={} @listener_thread.exit @connection.disconnect headers end
Is this client closed?
# File lib/stomp/client.rb, line 217 def closed? @connection.closed? end
Commit a transaction by name
# File lib/stomp/client.rb, line 132 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end
# File lib/stomp/client.rb, line 203 def connection_frame @connection.connection_frame end
# File lib/stomp/client.rb, line 207 def disconnect_receipt @connection.disconnect_receipt end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp/client.rb, line 107 def join(limit = nil) @listener_thread.join(limit) end
# File lib/stomp/client.rb, line 194 def obj_send(*args) __send__(*args) end
Is this client open?
# File lib/stomp/client.rb, line 212 def open? @connection.open? end
Publishes message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 187 def publish(destination, message, headers = {}) if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.publish(destination, message, headers) end
Check if the thread was created and isn't dead
# File lib/stomp/client.rb, line 228 def running @listener_thread && !!@listener_thread.status end
# File lib/stomp/client.rb, line 198 def send(*args) warn("This method is deprecated and will be removed on the next release. Use 'publish' instead") publish(*args) end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 142 def subscribe(destination, headers = {}) raise "No listener given" unless block_given? @listeners[destination] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end
Generated with the Darkfish Rdoc Generator 2.