Parent

Files

Stomp::Client

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Attributes

host[R]
login[R]
parameters[R]
passcode[R]
port[R]
reliable[R]

Public Class Methods

new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) click to toggle source

A new Client object can be initialized using two forms:

Standard positional parameters:

login     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 35
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)

  # Parse stomp:// URL's or set params
  if login.is_a?(Hash)
    @parameters = login
    
    first_host = @parameters[:hosts][0]
    
    @login = first_host[:login]
    @passcode = first_host[:passcode]
    @host = first_host[:host]
    @port = first_host[:port] || Connection::default_port(first_host[:ssl])
    
    @reliable = true
    
  elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port
    @login = $2 || ""
    @passcode = $3 || ""
    @host = $4
    @port = $5.to_i
    @reliable = false
  elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param

    first_host = {}
    first_host[:ssl] = !$2.nil?
    @login = first_host[:login] = $4 || ""
    @passcode = first_host[:passcode] = $5 || ""
    @host = first_host[:host] = $6
    @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl])
    
    options = $16 || ""
    parts = options.split(/&|=/)
    options = Hash[*parts]
    
    hosts = [first_host] + parse_hosts(login)
    
    @parameters = {}
    @parameters[:hosts] = hosts
    
    @parameters.merge! filter_options(options)
            
    @reliable = true
  else
    @login = login
    @passcode = passcode
    @host = host
    @port = port.to_i
    @reliable = reliable
  end

  check_arguments!

  @id_mutex = Mutex.new
  @ids = 1

  if @parameters
    @connection = Connection.new(@parameters)
  else
    @connection = Connection.new(@login, @passcode, @host, @port, @reliable)
  end
  
  start_listeners

end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) click to toggle source

Syntactic sugar for 'Client.new' See 'initialize' for usage.

# File lib/stomp/client.rb, line 101
def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(login, passcode, host, port, reliable)
end

Public Instance Methods

abort(name, headers = {}) click to toggle source

Abort a transaction by name

# File lib/stomp/client.rb, line 117
def abort(name, headers = {})
  @connection.abort(name, headers)

  # lets replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      if listener = @listeners[message.headers['destination']]
        listener.call(message)
      end
    end
  end
end
acknowledge(message, headers = {}) click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/client.rb, line 158
def acknowledge(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.ack message.headers['message-id'], headers
end
begin(name, headers = {}) click to toggle source

Begin a transaction by name

# File lib/stomp/client.rb, line 112
def begin(name, headers = {})
  @connection.begin(name, headers)
end
close(headers={}) click to toggle source

Close out resources in use by this client

# File lib/stomp/client.rb, line 222
def close headers={}
  @listener_thread.exit
  @connection.disconnect headers
end
closed?() click to toggle source

Is this client closed?

# File lib/stomp/client.rb, line 217
def closed?
  @connection.closed?
end
commit(name, headers = {}) click to toggle source

Commit a transaction by name

# File lib/stomp/client.rb, line 132
def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end
connection_frame() click to toggle source
# File lib/stomp/client.rb, line 203
def connection_frame
  @connection.connection_frame
end
disconnect_receipt() click to toggle source
# File lib/stomp/client.rb, line 207
def disconnect_receipt
  @connection.disconnect_receipt
end
join(limit = nil) click to toggle source

Join the listener thread for this client, generally used to wait for a quit signal

# File lib/stomp/client.rb, line 107
def join(limit = nil)
  @listener_thread.join(limit)
end
obj_send(*args) click to toggle source
# File lib/stomp/client.rb, line 194
def obj_send(*args)
  __send__(*args)
end
open?() click to toggle source

Is this client open?

# File lib/stomp/client.rb, line 212
def open?
  @connection.open?
end
publish(destination, message, headers = {}) click to toggle source

Publishes message to destination

If a block is given a receipt will be requested and passed to the block on receipt

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/client.rb, line 187
def publish(destination, message, headers = {})
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.publish(destination, message, headers)
end
running() click to toggle source

Check if the thread was created and isn't dead

# File lib/stomp/client.rb, line 228
def running
  @listener_thread && !!@listener_thread.status
end
send(*args) click to toggle source
# File lib/stomp/client.rb, line 198
def send(*args)
  warn("This method is deprecated and will be removed on the next release. Use 'publish' instead")
  publish(*args)
end
subscribe(destination, headers = {}) click to toggle source

Subscribe to a destination, must be passed a block which will be used as a callback listener

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/client.rb, line 142
def subscribe(destination, headers = {})
  raise "No listener given" unless block_given?
  @listeners[destination] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end
unreceive(message, options = {}) click to toggle source

Unreceive a message, sending it back to its queue or to the DLQ

# File lib/stomp/client.rb, line 177
def unreceive(message, options = {})
  @connection.unreceive(message, options)
end
unsubscribe(name, headers = {}) click to toggle source

Unsubecribe from a channel

# File lib/stomp/client.rb, line 149
def unsubscribe(name, headers = {})
  @connection.unsubscribe(name, headers)
  @listeners[name] = nil
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.