从 TCPSocket 发送到 EventMachine 的消息被截断为 16k

Message truncated to 16k sending from TCPSocket to EventMachine

我有一个客户端 ruby 进程正在尝试将数据发送到基于 EventMachine 构建的服务。工作流程很简单。客户端始终发起请求,并始终期望在 return.

中得到响应

从 TCPSocket 发送数据时,它显然将数据截断到大约 16k。关于我做错了什么或我所做的哪些假设需要重新考虑的任何建议?大概我在 EM 中做错了什么,我需要累积所有正在发送的数据,但我不确定如何。

# client.rb
def send_messages arr
  arr = Array(messages)
  if arr.length > 0
    logger.debug { "Sending #{ arr.length } messages" }


    marshaled = Marshal.dump(arr)
    logger.debug { "Marshaled data #{ marshaled.length } bytes" } # roughly 200k

    socket = TCPSocket.new(host, port)
    written = socket.write(marshaled)
    logger.debug { "Apparently sent #{ written } bytes" } # same size
    socket.close_write

    data = socket.read
    logger.debug { "Received #{ data.length } bytes from service" }
    Marshal.load(data)
  end

rescue Exception => ex
  logger.error "Client threw exception communicating with service :: #{ ex.message }"
  raise ex
ensure
  socket.close if socket
end

客户端的日志记录如下:

D, [2017-01-12T10:02:56.908857 #9360] DEBUG -- : Sending 1 messages 
D, [2017-01-12T10:02:56.909907 #9360] DEBUG -- : Marshaled data length 205941 bytes
D, [2017-01-12T10:02:56.910373 #9360] DEBUG -- : Apparently sent 205941 bytes
D, [2017-01-12T10:02:56.955270 #9360] DEBUG -- : Received 0 bytes from service

在服务器端...

class EventedServer < EM::Connection

  attr_reader :context

  def initialize context
    raise "Context must be defined" unless context
    @context = context
  end

  def post_init
    logger.debug { "-- someone connected to the server" }
  end

  def connection_completed
    logger.debug { "-- connection completed" }
  end

  def unbind
    logger.debug { "-- unbind" }
  end

  def receive_data data
    logger.debug { "-- received data at the server #{ data.length }" } # approx 16k
    send_data(process(data))
  end

  def process request

    logger.debug { "-- about to deserialize #{ request.length } bytes" }
    model = Marshal.load(request)
    logger.debug { "-- received #{ model.class.name }" }

    context.process_message(model)

    # Send data the application needs to stay up to date.
    response = context.application(app_name).pending_configurations
    logger.debug { "-- about to send #{ response.keys } for #{ app_name }" }
    Marshal.dump(response)
  rescue Exception => e
    logger.error("** Error in processing request of #{ request.length } bytes")
    raise e
  end
end

服务器端的日志记录如下:

D, [2017-01-12T10:02:56.910251 #9330] DEBUG -- : In evented server initialize...
D, [2017-01-12T10:02:56.910319 #9330] DEBUG -- : -- someone connected to the server
D, [2017-01-12T10:02:56.910419 #9330] DEBUG -- : -- received data at the server 16384
D, [2017-01-12T10:02:56.910463 #9330] DEBUG -- : -- about to deserialize 16384 bytes
E, [2017-01-12T10:02:56.912067 #9330] ERROR -- : ** Error in processing request of 16384 bytes

简短的回答是 receive_data 没有像我想象的那样等待 return 所有数据。需要缓冲区。在我的例子中,任何大于 16k 的消息都需要缓冲。我当前的代码如下所示:

  def post_init
    logger.debug { "-- someone connected to the server" }
    @buffer = []
    logger.debug { "-- initializing message buffer" }
  end

  DELIMITER = "\nDELIM\n"

  def receive_data data

    # NOTE: actual code checks to ensure data was not split in the delimiter
    if data.ends_with?(DELIMITER)
      logger.debug { "-- received data at the server #{ data.length }" }
      @buffer << data[0...(-1*DELIMITER.length)]
      joined_data = @buffer.join
      @buffer = []

      logger.debug { "-- total data received #{ joined_data.length }" }
      send_data(process(joined_data))
    else
      logger.debug { "-- received partial data at the server #{ data.length }" }
      @buffer << data
    end
  end