Mongo::Networking

Public Instance Methods

receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary, exhaust=false) click to toggle source

Sends a message to the database and waits for the response.

@param [Integer] operation a MongoDB opcode. @param [BSON::ByteBuffer] message a message to send to the database. @param [String] log_message this is currently a no-op and will be removed. @param [Socket] socket a socket to use in lieu of checking out a new one. @param [Boolean] command (false) indicate whether this is a command. If this is a command,

the message will be sent to the primary node.

@param [Boolean] command (false) indicate whether the cursor should be exhausted. Set

this to true only when the OP_QUERY_EXHAUST flag is set.

@return [Array]

An array whose indexes include [0] documents returned, [1] number of document received,
and [3] a cursor_id.
# File lib/mongo/networking.rb, line 112
def receive_message(operation, message, log_message=nil, socket=nil, command=false,
                    read=:primary, exhaust=false)
  request_id = add_message_headers(message, operation)
  packed_message = message.to_s

  result = ''
  sock   = nil
  begin
    if socket
      sock = socket
      should_checkin = false
    else
      if command || read == :primary
        sock = checkout_writer
      elsif read == :secondary
        sock = checkout_reader
      else
        sock = checkout_tagged(read)
      end
      should_checkin = true
    end

    send_message_on_socket(packed_message, sock)
    result = receive(sock, request_id, exhaust)
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  ensure
    if should_checkin
      if command || read == :primary
        checkin_writer(sock)
      elsif read == :secondary
        checkin_reader(sock)
      else
        # TODO: sock = checkout_tagged(read)
      end
    end
  end
  result
end
send_message(operation, message, opts={}) click to toggle source

Send a message to MongoDB, adding the necessary headers.

@param [Integer] operation a MongoDB opcode. @param [BSON::ByteBuffer] message a message to send to the database.

@option opts [Symbol] :connection (:writer) The connection to which

this message should be sent. Valid options are :writer and :reader.

@return [Integer] number of bytes sent

# File lib/mongo/networking.rb, line 19
def send_message(operation, message, opts={})
  if opts.is_a?(String)
    warn "Connection#send_message no longer takes a string log message. " +
      "Logging is now handled within the Collection and Cursor classes."
    opts = {}
  end

  connection = opts.fetch(:connection, :writer)

  add_message_headers(message, operation)
  packed_message = message.to_s

  sock = nil
  begin
    if connection == :writer
      sock = checkout_writer
    else
      sock = checkout_reader
    end

    send_message_on_socket(packed_message, sock)
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  ensure
    if sock
      if connection == :writer
        checkin_writer(sock)
      else
        checkin_reader(sock)
      end
    end
  end
end
send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false) click to toggle source

Sends a message to the database, waits for a response, and raises an exception if the operation has failed.

@param [Integer] operation a MongoDB opcode. @param [BSON::ByteBuffer] message a message to send to the database. @param [String] db_name the name of the database. used on call to get_last_error. @param [Hash] last_error_params parameters to be sent to getLastError. See DB#error for

available options.

@see DB#get_last_error for valid last error params.

@return [Hash] The document returned by the call to getlasterror.

# File lib/mongo/networking.rb, line 66
def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false)
  docs = num_received = cursor_id = ''
  add_message_headers(message, operation)

  last_error_message = BSON::ByteBuffer.new
  build_last_error_message(last_error_message, db_name, last_error_params)
  last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)

  packed_message = message.append!(last_error_message).to_s
  sock = nil
  begin
    sock = checkout_writer
    send_message_on_socket(packed_message, sock)
    docs, num_received, cursor_id = receive(sock, last_error_id)
    checkin_writer(sock)
  rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
    checkin_writer(sock)
    raise ex
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  end

  if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
    close if error == "not master"
    error = "wtimeout" if error == "timeout"
    raise OperationFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
  end

  docs[0]
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.