diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 05cf714107b..4bd3ea8779c 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -241,7 +241,7 @@ void InterruptCallback(v8::Isolate*, void* agent) { } void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) { - inspector_socket_t* socket = static_cast(stream->data); + inspector_socket_t* socket = inspector_from_stream(stream); static_cast(socket->data)->OnRemoteDataIO(socket, read, buf); } diff --git a/src/inspector_socket.cc b/src/inspector_socket.cc index c360ef0db95..0399f984d7d 100644 --- a/src/inspector_socket.cc +++ b/src/inspector_socket.cc @@ -48,16 +48,17 @@ static void dump_hex(const char* buf, size_t len) { } #endif +static void remove_from_beginning(std::vector* buffer, size_t count) { + buffer->erase(buffer->begin(), buffer->begin() + count); +} + static void dispose_inspector(uv_handle_t* handle) { - inspector_socket_t* inspector = - reinterpret_cast(handle->data); + inspector_socket_t* inspector = inspector_from_stream(handle); inspector_cb close = inspector->ws_mode ? inspector->ws_state->close_cb : nullptr; inspector->buffer.clear(); delete inspector->ws_state; inspector->ws_state = nullptr; - inspector->data_len = 0; - inspector->last_read_end = 0; if (close) { close(inspector, 0); } @@ -159,21 +160,19 @@ static std::vector encode_frame_hybi17(const char* message, return frame; } -static ws_decode_result decode_frame_hybi17(const char* buffer_begin, - size_t data_length, +static ws_decode_result decode_frame_hybi17(const std::vector& buffer, bool client_frame, int* bytes_consumed, std::vector* output, bool* compressed) { *bytes_consumed = 0; - if (data_length < 2) + if (buffer.size() < 2) return FRAME_INCOMPLETE; - const char* p = buffer_begin; - const char* buffer_end = p + data_length; + auto it = buffer.begin(); - unsigned char first_byte = *p++; - unsigned char second_byte = *p++; + unsigned char first_byte = *it++; + unsigned char second_byte = *it++; bool final = (first_byte & kFinalBit) != 0; bool reserved1 = (first_byte & kReserved1Bit) != 0; @@ -215,12 +214,12 @@ static ws_decode_result decode_frame_hybi17(const char* buffer_begin, } else { return FRAME_ERROR; } - if (buffer_end - p < extended_payload_length_size) + if ((buffer.end() - it) < extended_payload_length_size) return FRAME_INCOMPLETE; payload_length64 = 0; for (int i = 0; i < extended_payload_length_size; ++i) { payload_length64 <<= 8; - payload_length64 |= static_cast(*p++); + payload_length64 |= static_cast(*it++); } } @@ -233,16 +232,16 @@ static ws_decode_result decode_frame_hybi17(const char* buffer_begin, } size_t payload_length = static_cast(payload_length64); - if (data_length - kMaskingKeyWidthInBytes < payload_length) + if (buffer.size() - kMaskingKeyWidthInBytes < payload_length) return FRAME_INCOMPLETE; - const char* masking_key = p; - const char* payload = p + kMaskingKeyWidthInBytes; + std::vector::const_iterator masking_key = it; + std::vector::const_iterator payload = it + kMaskingKeyWidthInBytes; for (size_t i = 0; i < payload_length; ++i) // Unmask the payload. output->insert(output->end(), payload[i] ^ masking_key[i % kMaskingKeyWidthInBytes]); - size_t pos = p + kMaskingKeyWidthInBytes + payload_length - buffer_begin; + size_t pos = it + kMaskingKeyWidthInBytes + payload_length - buffer.begin(); *bytes_consumed = pos; return closed ? FRAME_CLOSE : FRAME_OK; } @@ -280,13 +279,13 @@ static void close_frame_received(inspector_socket_t* inspector) { } } -static int parse_ws_frames(inspector_socket_t* inspector, size_t len) { +static int parse_ws_frames(inspector_socket_t* inspector) { int bytes_consumed = 0; std::vector output; bool compressed = false; - ws_decode_result r = decode_frame_hybi17(&inspector->buffer[0], - len, true /* client_frame */, + ws_decode_result r = decode_frame_hybi17(inspector->buffer, + true /* client_frame */, &bytes_consumed, &output, &compressed); // Compressed frame means client is ignoring the headers and misbehaves @@ -312,24 +311,22 @@ static int parse_ws_frames(inspector_socket_t* inspector, size_t len) { } static void prepare_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) { - inspector_socket_t* inspector = - reinterpret_cast(stream->data); + *buf = uv_buf_init(new char[len], len); +} - if (len > (inspector->buffer.size() - inspector->data_len)) { - int new_size = (inspector->data_len + len + BUFFER_GROWTH_CHUNK_SIZE - 1) / - BUFFER_GROWTH_CHUNK_SIZE * - BUFFER_GROWTH_CHUNK_SIZE; - inspector->buffer.resize(new_size); +static void reclaim_uv_buf(inspector_socket_t* inspector, const uv_buf_t* buf, + ssize_t read) { + if (read > 0) { + std::vector& buffer = inspector->buffer; + buffer.insert(buffer.end(), buf->base, buf->base + read); } - buf->base = &inspector->buffer[inspector->data_len]; - buf->len = len; - inspector->data_len += len; + delete[] buf->base; } static void websockets_data_cb(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - inspector_socket_t* inspector = - reinterpret_cast(stream->data); + inspector_socket_t* inspector = inspector_from_stream(stream); + reclaim_uv_buf(inspector, buf, nread); if (nread < 0 || nread == UV_EOF) { inspector->connection_eof = true; if (!inspector->shutting_down && inspector->ws_state->read_cb) { @@ -339,29 +336,19 @@ static void websockets_data_cb(uv_stream_t* stream, ssize_t nread, #if DUMP_READS printf("%s read %ld bytes\n", __FUNCTION__, nread); if (nread > 0) { - dump_hex(buf->base, nread); + dump_hex(inspector->buffer.data() + inspector->buffer.size() - nread, + nread); } #endif - // 1. Move read bytes to continue the buffer - // Should be same as this is supposedly last buffer - ASSERT_EQ(buf->base + buf->len, &inspector->buffer[inspector->data_len]); - - // Should be noop... - memmove(&inspector->buffer[inspector->last_read_end], buf->base, nread); - inspector->last_read_end += nread; - // 2. Parse. int processed = 0; do { - processed = parse_ws_frames(inspector, inspector->last_read_end); + processed = parse_ws_frames(inspector); // 3. Fix the buffer size & length if (processed > 0) { - memmove(&inspector->buffer[0], &inspector->buffer[processed], - inspector->last_read_end - processed); - inspector->last_read_end -= processed; - inspector->data_len = inspector->last_read_end; + remove_from_beginning(&inspector->buffer, processed); } - } while (processed > 0 && inspector->data_len > 0); + } while (processed > 0 && !inspector->buffer.empty()); } } @@ -435,7 +422,6 @@ static void handshake_complete(inspector_socket_t* inspector) { uv_read_stop(reinterpret_cast(&inspector->client)); handshake_cb callback = inspector->http_parsing_state->callback; inspector->ws_state = new ws_state_s(); - inspector->last_read_end = 0; inspector->ws_mode = true; callback(inspector, kInspectorHandshakeUpgraded, inspector->http_parsing_state->path); @@ -448,8 +434,7 @@ static void cleanup_http_parsing_state(inspector_socket_t* inspector) { static void report_handshake_failure_cb(uv_handle_t* handle) { dispose_inspector(handle); - inspector_socket_t* inspector = - static_cast(handle->data); + inspector_socket_t* inspector = inspector_from_stream(handle); handshake_cb cb = inspector->http_parsing_state->callback; cleanup_http_parsing_state(inspector); cb(inspector, kInspectorHandshakeFailed, std::string()); @@ -481,8 +466,7 @@ static void init_handshake(inspector_socket_t* inspector); static int message_complete_cb(http_parser* parser) { inspector_socket_t* inspector = reinterpret_cast(parser->data); - struct http_parsing_state_s* state = - (struct http_parsing_state_s*) inspector->http_parsing_state; + struct http_parsing_state_s* state = inspector->http_parsing_state; if (parser->method != HTTP_GET) { handshake_failed(inspector); } else if (!parser->upgrade) { @@ -527,22 +511,22 @@ static void data_received_cb(uv_stream_s* client, ssize_t nread, printf("[%s:%d] %s\n", __FUNCTION__, __LINE__, uv_err_name(nread)); } #endif - inspector_socket_t* inspector = - reinterpret_cast((client->data)); + inspector_socket_t* inspector = inspector_from_stream(client); + reclaim_uv_buf(inspector, buf, nread); if (nread < 0 || nread == UV_EOF) { close_and_report_handshake_failure(inspector); } else { http_parsing_state_s* state = inspector->http_parsing_state; http_parser* parser = &state->parser; - http_parser_execute(parser, &state->parser_settings, &inspector->buffer[0], - nread); + http_parser_execute(parser, &state->parser_settings, + inspector->buffer.data(), nread); + remove_from_beginning(&inspector->buffer, nread); if (parser->http_errno != HPE_OK) { handshake_failed(inspector); } if (inspector->http_parsing_state->done) { cleanup_http_parsing_state(inspector); } - inspector->data_len = 0; } } @@ -576,7 +560,6 @@ int inspector_accept(uv_stream_t* server, inspector_socket_t* inspector, err = uv_accept(server, client); } if (err == 0) { - client->data = inspector; init_handshake(inspector); inspector->http_parsing_state->callback = callback; err = uv_read_start(client, prepare_buffer, diff --git a/src/inspector_socket.h b/src/inspector_socket.h index 5d3477b98c1..de57d09e98e 100644 --- a/src/inspector_socket.h +++ b/src/inspector_socket.h @@ -2,6 +2,8 @@ #define SRC_INSPECTOR_SOCKET_H_ #include "http_parser.h" +#include "util.h" +#include "util-inl.h" #include "uv.h" #include @@ -48,8 +50,6 @@ struct inspector_socket_s { struct http_parsing_state_s* http_parsing_state; struct ws_state_s* ws_state; std::vector buffer; - size_t data_len; - size_t last_read_end; uv_tcp_t client; bool ws_mode; bool shutting_down; @@ -64,7 +64,8 @@ int inspector_accept(uv_stream_t* server, struct inspector_socket_s* inspector, void inspector_close(struct inspector_socket_s* inspector, inspector_cb callback); -// Callbacks will receive handles that has inspector in data field... +// Callbacks will receive stream handles. Use inspector_from_stream to get +// inspector_socket_t* from the stream handle. int inspector_read_start(struct inspector_socket_s* inspector, uv_alloc_cb, uv_read_cb); void inspector_read_stop(struct inspector_socket_s* inspector); @@ -72,4 +73,16 @@ void inspector_write(struct inspector_socket_s* inspector, const char* data, size_t len); bool inspector_is_active(const struct inspector_socket_s* inspector); +inline inspector_socket_t* inspector_from_stream(uv_tcp_t* stream) { + return node::ContainerOf(&inspector_socket_t::client, stream); +} + +inline inspector_socket_t* inspector_from_stream(uv_stream_t* stream) { + return inspector_from_stream(reinterpret_cast(stream)); +} + +inline inspector_socket_t* inspector_from_stream(uv_handle_t* stream) { + return inspector_from_stream(reinterpret_cast(stream)); +} + #endif // SRC_INSPECTOR_SOCKET_H_ diff --git a/test/cctest/test_inspector_socket.cc b/test/cctest/test_inspector_socket.cc index 6f8cd25486a..23d382ebe63 100644 --- a/test/cctest/test_inspector_socket.cc +++ b/test/cctest/test_inspector_socket.cc @@ -178,7 +178,7 @@ struct expectations { static void grow_expects_buffer(uv_handle_t* stream, size_t size, uv_buf_t* b) { expectations* expects = static_cast( - (static_cast(stream->data))->data); + inspector_from_stream(stream)->data); size_t end = expects->actual_end; // Grow the buffer in chunks of 64k. size_t new_length = (end + size + 65535) & ~((size_t) 0xFFFF); @@ -213,7 +213,7 @@ static void grow_expects_buffer(uv_handle_t* stream, size_t size, uv_buf_t* b) { static void save_read_data(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { expectations* expects =static_cast( - (static_cast(stream->data))->data); + inspector_from_stream(stream)->data); expects->err_code = nread < 0 ? nread : 0; if (nread > 0) { expects->actual_end += nread; @@ -254,8 +254,7 @@ static void expect_on_server(const char* data, size_t len) { static void inspector_record_error_code(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - inspector_socket_t *inspector = - reinterpret_cast(stream->data); + inspector_socket_t *inspector = inspector_from_stream(stream); // Increment instead of assign is to ensure the function is only called once *(static_cast(inspector->data)) += nread; } @@ -760,8 +759,7 @@ static void CleanupSocketAfterEOF_close_cb(inspector_socket_t* inspector, static void CleanupSocketAfterEOF_read_cb(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { EXPECT_EQ(UV_EOF, nread); - inspector_socket_t* insp = - reinterpret_cast(stream->data); + inspector_socket_t* insp = inspector_from_stream(stream); inspector_close(insp, CleanupSocketAfterEOF_close_cb); }