从 TCPSocket 发送到 EventMachine 的消息被截断为 16k
Message truncated to 16k sending from TCPSocket to EventMachine
我有一个客户端 ruby 进程正在尝试将数据发送到基于 EventMachine 构建的服务。工作流程很简单。客户端始终发起请求,并始终期望在 return.
中得到响应
从 TCPSocket 发送数据时,它显然将数据截断到大约 16k。关于我做错了什么或我所做的哪些假设需要重新考虑的任何建议?大概我在 EM 中做错了什么,我需要累积所有正在发送的数据,但我不确定如何。
# client.rb
def send_messages arr
arr = Array(messages)
if arr.length > 0
logger.debug { "Sending #{ arr.length } messages" }
marshaled = Marshal.dump(arr)
logger.debug { "Marshaled data #{ marshaled.length } bytes" } # roughly 200k
socket = TCPSocket.new(host, port)
written = socket.write(marshaled)
logger.debug { "Apparently sent #{ written } bytes" } # same size
socket.close_write
data = socket.read
logger.debug { "Received #{ data.length } bytes from service" }
Marshal.load(data)
end
rescue Exception => ex
logger.error "Client threw exception communicating with service :: #{ ex.message }"
raise ex
ensure
socket.close if socket
end
客户端的日志记录如下:
D, [2017-01-12T10:02:56.908857 #9360] DEBUG -- : Sending 1 messages
D, [2017-01-12T10:02:56.909907 #9360] DEBUG -- : Marshaled data length 205941 bytes
D, [2017-01-12T10:02:56.910373 #9360] DEBUG -- : Apparently sent 205941 bytes
D, [2017-01-12T10:02:56.955270 #9360] DEBUG -- : Received 0 bytes from service
在服务器端...
class EventedServer < EM::Connection
attr_reader :context
def initialize context
raise "Context must be defined" unless context
@context = context
end
def post_init
logger.debug { "-- someone connected to the server" }
end
def connection_completed
logger.debug { "-- connection completed" }
end
def unbind
logger.debug { "-- unbind" }
end
def receive_data data
logger.debug { "-- received data at the server #{ data.length }" } # approx 16k
send_data(process(data))
end
def process request
logger.debug { "-- about to deserialize #{ request.length } bytes" }
model = Marshal.load(request)
logger.debug { "-- received #{ model.class.name }" }
context.process_message(model)
# Send data the application needs to stay up to date.
response = context.application(app_name).pending_configurations
logger.debug { "-- about to send #{ response.keys } for #{ app_name }" }
Marshal.dump(response)
rescue Exception => e
logger.error("** Error in processing request of #{ request.length } bytes")
raise e
end
end
服务器端的日志记录如下:
D, [2017-01-12T10:02:56.910251 #9330] DEBUG -- : In evented server initialize...
D, [2017-01-12T10:02:56.910319 #9330] DEBUG -- : -- someone connected to the server
D, [2017-01-12T10:02:56.910419 #9330] DEBUG -- : -- received data at the server 16384
D, [2017-01-12T10:02:56.910463 #9330] DEBUG -- : -- about to deserialize 16384 bytes
E, [2017-01-12T10:02:56.912067 #9330] ERROR -- : ** Error in processing request of 16384 bytes
简短的回答是 receive_data
没有像我想象的那样等待 return 所有数据。需要缓冲区。在我的例子中,任何大于 16k 的消息都需要缓冲。我当前的代码如下所示:
def post_init
logger.debug { "-- someone connected to the server" }
@buffer = []
logger.debug { "-- initializing message buffer" }
end
DELIMITER = "\nDELIM\n"
def receive_data data
# NOTE: actual code checks to ensure data was not split in the delimiter
if data.ends_with?(DELIMITER)
logger.debug { "-- received data at the server #{ data.length }" }
@buffer << data[0...(-1*DELIMITER.length)]
joined_data = @buffer.join
@buffer = []
logger.debug { "-- total data received #{ joined_data.length }" }
send_data(process(joined_data))
else
logger.debug { "-- received partial data at the server #{ data.length }" }
@buffer << data
end
end
我有一个客户端 ruby 进程正在尝试将数据发送到基于 EventMachine 构建的服务。工作流程很简单。客户端始终发起请求,并始终期望在 return.
中得到响应从 TCPSocket 发送数据时,它显然将数据截断到大约 16k。关于我做错了什么或我所做的哪些假设需要重新考虑的任何建议?大概我在 EM 中做错了什么,我需要累积所有正在发送的数据,但我不确定如何。
# client.rb
def send_messages arr
arr = Array(messages)
if arr.length > 0
logger.debug { "Sending #{ arr.length } messages" }
marshaled = Marshal.dump(arr)
logger.debug { "Marshaled data #{ marshaled.length } bytes" } # roughly 200k
socket = TCPSocket.new(host, port)
written = socket.write(marshaled)
logger.debug { "Apparently sent #{ written } bytes" } # same size
socket.close_write
data = socket.read
logger.debug { "Received #{ data.length } bytes from service" }
Marshal.load(data)
end
rescue Exception => ex
logger.error "Client threw exception communicating with service :: #{ ex.message }"
raise ex
ensure
socket.close if socket
end
客户端的日志记录如下:
D, [2017-01-12T10:02:56.908857 #9360] DEBUG -- : Sending 1 messages
D, [2017-01-12T10:02:56.909907 #9360] DEBUG -- : Marshaled data length 205941 bytes
D, [2017-01-12T10:02:56.910373 #9360] DEBUG -- : Apparently sent 205941 bytes
D, [2017-01-12T10:02:56.955270 #9360] DEBUG -- : Received 0 bytes from service
在服务器端...
class EventedServer < EM::Connection
attr_reader :context
def initialize context
raise "Context must be defined" unless context
@context = context
end
def post_init
logger.debug { "-- someone connected to the server" }
end
def connection_completed
logger.debug { "-- connection completed" }
end
def unbind
logger.debug { "-- unbind" }
end
def receive_data data
logger.debug { "-- received data at the server #{ data.length }" } # approx 16k
send_data(process(data))
end
def process request
logger.debug { "-- about to deserialize #{ request.length } bytes" }
model = Marshal.load(request)
logger.debug { "-- received #{ model.class.name }" }
context.process_message(model)
# Send data the application needs to stay up to date.
response = context.application(app_name).pending_configurations
logger.debug { "-- about to send #{ response.keys } for #{ app_name }" }
Marshal.dump(response)
rescue Exception => e
logger.error("** Error in processing request of #{ request.length } bytes")
raise e
end
end
服务器端的日志记录如下:
D, [2017-01-12T10:02:56.910251 #9330] DEBUG -- : In evented server initialize...
D, [2017-01-12T10:02:56.910319 #9330] DEBUG -- : -- someone connected to the server
D, [2017-01-12T10:02:56.910419 #9330] DEBUG -- : -- received data at the server 16384
D, [2017-01-12T10:02:56.910463 #9330] DEBUG -- : -- about to deserialize 16384 bytes
E, [2017-01-12T10:02:56.912067 #9330] ERROR -- : ** Error in processing request of 16384 bytes
简短的回答是 receive_data
没有像我想象的那样等待 return 所有数据。需要缓冲区。在我的例子中,任何大于 16k 的消息都需要缓冲。我当前的代码如下所示:
def post_init
logger.debug { "-- someone connected to the server" }
@buffer = []
logger.debug { "-- initializing message buffer" }
end
DELIMITER = "\nDELIM\n"
def receive_data data
# NOTE: actual code checks to ensure data was not split in the delimiter
if data.ends_with?(DELIMITER)
logger.debug { "-- received data at the server #{ data.length }" }
@buffer << data[0...(-1*DELIMITER.length)]
joined_data = @buffer.join
@buffer = []
logger.debug { "-- total data received #{ joined_data.length }" }
send_data(process(joined_data))
else
logger.debug { "-- received partial data at the server #{ data.length }" }
@buffer << data
end
end