在自制 C/C++ websocket 实现中收听消息时遇到问题
Having trouple listening for messages in homemade C/C++ websocket implementation
所以我目前正在 C/C++ 中实现一个 websocket,经过大量的混乱之后我让握手工作(这是一个空白错误..)。无论如何,现在我不知道如何在握手建立后继续收听消息,我边学边学,所以有点乱,但我当然会回答有关代码的任何问题。这就是我现在拥有的:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
//cpp
#include <string>
#include <iostream>
#include <openssl/sha.h>
#include "include/base64.h"
int main(int argc, char *argv[])
{
int listenfd = 0, connfd = 0;
struct sockaddr_in serv_addr;
time_t ticks;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
memset(&serv_addr, '0', sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(8080);
bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
listen(listenfd, 8080);
bool ws = false;
while(1)
{
int res = 0;
if(ws){
while(true){
char buffer_ws[1400];
res = recv(listenfd, buffer_ws, 1400, 0);
if(res > 0)
std::cout << "Data!" << std::endl;
sleep(1);
}
}
connfd = accept(listenfd, (struct sockaddr*)NULL, NULL);
std::cout << "connfd = " << connfd << std::endl;
char buffer[1400];
read(connfd, buffer, 1400);
std::string buf(buffer);
std::cout << buf << std::endl;
std::string reply;
ws = strstr(buffer, "Upgrade: websocket");
if(ws){ // if websocket handshake. This works
std::cout << "<websocket>" << std::endl;
std::string key = buf.substr(buf.find("Sec-WebSocket-Key") + 19,
buf.substr(buf.find("Sec-WebSocket-Key")).find("\n") - 20);
std::cout << "key = " << key << std::endl;
key.append("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
std::cout << "key = " << key << " length = " << key.length() << std::endl;
unsigned char const* hash = SHA1(reinterpret_cast<const unsigned char*>(key.c_str()), key.lengt\
h(), nullptr);
std::string b64 = base64_encode(hash, 20);
std::cout << "b64 = " << b64 << std::endl;
reply =
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: " + b64 + "\r\n\r\n";
}else{
std::cout << "<other>" << std::endl;
reply =
"HTTP/1.1 200 OK\n"
"\n<script>var ws = new WebSocket('ws://192.168.10.117:8080');\n"
"ws.addEventListener('open',function(event){"
"\n\tconsole.log('open!!');\n\tws.send('yo bro!');\n});\n"
"var sendmessage = function(){console.log('click');ws.send('test123');};</script>"
"<input type=\"text\"><button onclick=\"sendmessage()\">send</button>";
}
send(connfd, reply.c_str(), reply.size(), 0);
close(connfd);
sleep(1);
}
}
现在,当我尝试从客户端 websocket 发送时,我只是没有收到任何东西。
@Jontahan,
我认为您有了一个良好的开端,并从这个项目中获得了无穷乐趣 - 但这不是一项小工程,在您进行过程中有很多东西需要学习和修复。
您的代码中存在一些设计问题:
- 阻止您进入下一阶段(在接受新连接的同时收听消息);和
- 引入可能导致 DoS(拒绝服务)的有意义的安全漏洞。
例如,在 accept
阻塞后(在此期间,您无法读取传入的客户端消息),您继续阻塞 read
,等待 HTTP 请求(同样,您的代码正在等待网络事件,而它可能有更好的事情要做。
此行为会阻止您的代码执行任务,因为您的代码正在等待 IO (accept
/ read
),而其他事件可能同时发生(即,如果 websocket 消息到达怎么办当您的代码忙于等待 accept
最后 10 分钟时?)。
此外,此行为引入了与慢速客户端(即接收不良的客户端或恶意攻击者)相关的安全漏洞。例如,如果 HTTP 请求需要整整一分钟才能到达怎么办?如果到达时进行了碎片整理怎么办?如果它一次到达一行怎么办?如果一次收到一封信怎么办?
...
您的代码是测试握手的良好开端,但在将代码用作实际服务器之前应重新考虑核心设计。
一种解决方案 - 通常用于小型服务器 - 是使用每个连接一个线程的设计,因此在 accept
之后产生一个新线程并负责处理连接(它阻塞在 read
而不会中断其他线程)。
然而,这个解决方案不是最优的,并且引入了一些小的安全风险(每个设计都有安全风险,这是关于选择可能的最小风险)。
另一种解决方案,由一些最好的 (nginx
/ node.js
) 实施,是使用具有单线程的 "evented" 设计。这有时被称为 "reactor" 设计。
这个设计是迄今为止最好的设计之一,但它确实存在其他问题,并且需要在代码中多加注意,所以缓慢的 运行 任务/功能不会耗尽整个服务器停下来。
此设计通常使用 kqueue
、epoll
或(因为没有更好的选择)poll
/ select
来实现
在伪代码中,它可能看起来像这样:
void defer_task(void (*func((void *)), void * arg) {
// place task in queue
}
void run_tasks() {
while (queue->not_empty()) {
task = grab_oldest_task();
task.func(task.arg);
}
}
void task_listen(void * data) {
// open listening socket
}
void task_poll(void * _) {
// poll existing clients and listening sockets
// probably using `kqueue`/`epoll`/`poll`/`select`
defer_task(task_ondata, (void*)fd);
// finish with rescheduling the poll task
defer_task(task_poll, NULL);
}
void task_ondata(void * data) {
int fd = (intptr_t)data;
// handle `accept` / `read` making sure all sockets are non-blocking
}
int main(void) {
defer_task(task_listen, NULL);
defer_task(task_poll, NULL);
run_tasks();
}
这是我编写的任务调度系统的示例...它未在生产中测试,但它可能会阐明我对 defer_task
:
的意思
/* *****************************************************************************
API declarations - should be moved to a separate .h file.
***************************************************************************** */
/** Defer an execution of a function for later. Returns -1 on error.*/
int defer(void (*func)(void *), void *arg);
/** Performs all deferred functions until the queue had been depleated. */
void defer_perform(void);
/** returns true if there are deferred functions waiting for execution. */
int defer_has_queue(void);
/* *****************************************************************************
Compile time settings
***************************************************************************** */
#ifndef DEFER_QUEUE_BUFFER
#define DEFER_QUEUE_BUFFER 1024
#endif
/* *****************************************************************************
spinlock / sync for tasks
***************************************************************************** */
#if defined(__unix__) || defined(__APPLE__) || defined(__linux__)
#define _GNU_SOURCE
#include <time.h>
#endif /* _GNU_SOURCE */
#include <stdlib.h>
/* manage the way threads "wait" for the lock to release */
#if defined(__unix__) || defined(__APPLE__) || defined(__linux__)
/* nanosleep seems to be the most effective and efficient reschedule */
#define defer_nanosleep(length) \
{ \
static const struct timespec tm = {.tv_nsec = length}; \
nanosleep(&tm, NULL); \
}
#define reschedule_thread() defer_nanosleep(1)
#define throttle_thread() defer_nanosleep(8388608UL)
#else /* no effective rescheduling, just spin... */
#define reschedule_thread()
#define throttle_thread()
#endif
/** locks use a single byte */
typedef volatile unsigned char spn_lock_i;
/** The initail value of an unlocked spinlock. */
#define SPN_LOCK_INIT 0
/* Select the correct compiler builtin method. */
#if defined(__has_builtin)
#if __has_builtin(__sync_swap)
#define SPN_LOCK_BUILTIN(...) __sync_swap(__VA_ARGS__)
#elif __has_builtin(__sync_fetch_and_or)
#define SPN_LOCK_BUILTIN(...) __sync_fetch_and_or(__VA_ARGS__)
#else
#error Required builtin "__sync_swap" or "__sync_fetch_and_or" missing from compiler.
#endif /* defined(__has_builtin) */
#elif __GNUC__ > 3
#define SPN_LOCK_BUILTIN(...) __sync_fetch_and_or(__VA_ARGS__)
#else
#error Required builtin "__sync_swap" or "__sync_fetch_and_or" not found.
#endif
/** returns 1 and 0 if the lock was successfully aquired (TRUE == FAIL). */
static inline int spn_trylock(spn_lock_i *lock) {
return SPN_LOCK_BUILTIN(lock, 1);
}
/** Releases a lock. */
static inline __attribute__((unused)) void spn_unlock(spn_lock_i *lock) {
__asm__ volatile("" ::: "memory");
*lock = 0;
}
/** returns a lock's state (non 0 == Busy). */
static inline __attribute__((unused)) int spn_is_locked(spn_lock_i *lock) {
__asm__ volatile("" ::: "memory");
return *lock;
}
/** Busy waits for the lock. */
static inline __attribute__((unused)) void spn_lock(spn_lock_i *lock) {
while (spn_trylock(lock)) {
reschedule_thread();
}
}
/* *****************************************************************************
Data Structures
***************************************************************************** */
typedef struct {
void (*func)(void *);
void *arg;
} task_s;
typedef struct task_node_s {
task_s task;
struct task_node_s *next;
} task_node_s;
static task_node_s tasks_buffer[DEFER_QUEUE_BUFFER];
static struct {
task_node_s *first;
task_node_s **last;
task_node_s *pool;
spn_lock_i lock;
unsigned char initialized;
} deferred = {.first = NULL,
.last = &deferred.first,
.pool = NULL,
.lock = 0,
.initialized = 0};
/* *****************************************************************************
API
***************************************************************************** */
/** Defer an execution of a function for later. */
int defer(void (*func)(void *), void *arg) {
if (!func)
return -1;
task_node_s *task;
spn_lock(&deferred.lock);
if (deferred.pool) {
task = deferred.pool;
deferred.pool = deferred.pool->next;
} else if (deferred.initialized) {
task = malloc(sizeof(task_node_s));
if (!task)
goto error;
} else { /* lazy initialization of task buffer */
deferred.initialized = 1;
task = tasks_buffer;
deferred.pool = tasks_buffer + 1;
for (size_t i = 2; i < DEFER_QUEUE_BUFFER; i++) {
tasks_buffer[i - 1].next = tasks_buffer + i;
}
}
*deferred.last = task;
deferred.last = &task->next;
task->task.func = func;
task->task.arg = arg;
task->next = NULL;
spn_unlock(&deferred.lock);
return 0;
error:
spn_unlock(&deferred.lock);
return -1;
}
/** Performs all deferred functions until the queue had been depleted. */
void defer_perform(void) {
task_node_s *tmp;
task_s task;
restart:
spn_lock(&deferred.lock);
tmp = deferred.first;
if (tmp) {
deferred.first = tmp->next;
if (!deferred.first)
deferred.last = &deferred.first;
task = tmp->task;
if (tmp <= tasks_buffer + (DEFER_QUEUE_BUFFER - 1) && tmp >= tasks_buffer) {
tmp->next = deferred.pool;
deferred.pool = tmp;
} else {
free(tmp);
}
spn_unlock(&deferred.lock);
task.func(task.arg);
goto restart;
} else
spn_unlock(&deferred.lock);
}
/** returns true if there are deferred functions waiting for execution. */
int defer_has_queue(void) { return deferred.first != NULL; }
所以我目前正在 C/C++ 中实现一个 websocket,经过大量的混乱之后我让握手工作(这是一个空白错误..)。无论如何,现在我不知道如何在握手建立后继续收听消息,我边学边学,所以有点乱,但我当然会回答有关代码的任何问题。这就是我现在拥有的:
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <time.h>
//cpp
#include <string>
#include <iostream>
#include <openssl/sha.h>
#include "include/base64.h"
int main(int argc, char *argv[])
{
int listenfd = 0, connfd = 0;
struct sockaddr_in serv_addr;
time_t ticks;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
memset(&serv_addr, '0', sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(8080);
bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
listen(listenfd, 8080);
bool ws = false;
while(1)
{
int res = 0;
if(ws){
while(true){
char buffer_ws[1400];
res = recv(listenfd, buffer_ws, 1400, 0);
if(res > 0)
std::cout << "Data!" << std::endl;
sleep(1);
}
}
connfd = accept(listenfd, (struct sockaddr*)NULL, NULL);
std::cout << "connfd = " << connfd << std::endl;
char buffer[1400];
read(connfd, buffer, 1400);
std::string buf(buffer);
std::cout << buf << std::endl;
std::string reply;
ws = strstr(buffer, "Upgrade: websocket");
if(ws){ // if websocket handshake. This works
std::cout << "<websocket>" << std::endl;
std::string key = buf.substr(buf.find("Sec-WebSocket-Key") + 19,
buf.substr(buf.find("Sec-WebSocket-Key")).find("\n") - 20);
std::cout << "key = " << key << std::endl;
key.append("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
std::cout << "key = " << key << " length = " << key.length() << std::endl;
unsigned char const* hash = SHA1(reinterpret_cast<const unsigned char*>(key.c_str()), key.lengt\
h(), nullptr);
std::string b64 = base64_encode(hash, 20);
std::cout << "b64 = " << b64 << std::endl;
reply =
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: " + b64 + "\r\n\r\n";
}else{
std::cout << "<other>" << std::endl;
reply =
"HTTP/1.1 200 OK\n"
"\n<script>var ws = new WebSocket('ws://192.168.10.117:8080');\n"
"ws.addEventListener('open',function(event){"
"\n\tconsole.log('open!!');\n\tws.send('yo bro!');\n});\n"
"var sendmessage = function(){console.log('click');ws.send('test123');};</script>"
"<input type=\"text\"><button onclick=\"sendmessage()\">send</button>";
}
send(connfd, reply.c_str(), reply.size(), 0);
close(connfd);
sleep(1);
}
}
现在,当我尝试从客户端 websocket 发送时,我只是没有收到任何东西。
@Jontahan,
我认为您有了一个良好的开端,并从这个项目中获得了无穷乐趣 - 但这不是一项小工程,在您进行过程中有很多东西需要学习和修复。
您的代码中存在一些设计问题:
- 阻止您进入下一阶段(在接受新连接的同时收听消息);和
- 引入可能导致 DoS(拒绝服务)的有意义的安全漏洞。
例如,在 accept
阻塞后(在此期间,您无法读取传入的客户端消息),您继续阻塞 read
,等待 HTTP 请求(同样,您的代码正在等待网络事件,而它可能有更好的事情要做。
此行为会阻止您的代码执行任务,因为您的代码正在等待 IO (accept
/ read
),而其他事件可能同时发生(即,如果 websocket 消息到达怎么办当您的代码忙于等待 accept
最后 10 分钟时?)。
此外,此行为引入了与慢速客户端(即接收不良的客户端或恶意攻击者)相关的安全漏洞。例如,如果 HTTP 请求需要整整一分钟才能到达怎么办?如果到达时进行了碎片整理怎么办?如果它一次到达一行怎么办?如果一次收到一封信怎么办?
...
您的代码是测试握手的良好开端,但在将代码用作实际服务器之前应重新考虑核心设计。
一种解决方案 - 通常用于小型服务器 - 是使用每个连接一个线程的设计,因此在 accept
之后产生一个新线程并负责处理连接(它阻塞在 read
而不会中断其他线程)。
然而,这个解决方案不是最优的,并且引入了一些小的安全风险(每个设计都有安全风险,这是关于选择可能的最小风险)。
另一种解决方案,由一些最好的 (nginx
/ node.js
) 实施,是使用具有单线程的 "evented" 设计。这有时被称为 "reactor" 设计。
这个设计是迄今为止最好的设计之一,但它确实存在其他问题,并且需要在代码中多加注意,所以缓慢的 运行 任务/功能不会耗尽整个服务器停下来。
此设计通常使用 kqueue
、epoll
或(因为没有更好的选择)poll
/ select
在伪代码中,它可能看起来像这样:
void defer_task(void (*func((void *)), void * arg) {
// place task in queue
}
void run_tasks() {
while (queue->not_empty()) {
task = grab_oldest_task();
task.func(task.arg);
}
}
void task_listen(void * data) {
// open listening socket
}
void task_poll(void * _) {
// poll existing clients and listening sockets
// probably using `kqueue`/`epoll`/`poll`/`select`
defer_task(task_ondata, (void*)fd);
// finish with rescheduling the poll task
defer_task(task_poll, NULL);
}
void task_ondata(void * data) {
int fd = (intptr_t)data;
// handle `accept` / `read` making sure all sockets are non-blocking
}
int main(void) {
defer_task(task_listen, NULL);
defer_task(task_poll, NULL);
run_tasks();
}
这是我编写的任务调度系统的示例...它未在生产中测试,但它可能会阐明我对 defer_task
:
/* *****************************************************************************
API declarations - should be moved to a separate .h file.
***************************************************************************** */
/** Defer an execution of a function for later. Returns -1 on error.*/
int defer(void (*func)(void *), void *arg);
/** Performs all deferred functions until the queue had been depleated. */
void defer_perform(void);
/** returns true if there are deferred functions waiting for execution. */
int defer_has_queue(void);
/* *****************************************************************************
Compile time settings
***************************************************************************** */
#ifndef DEFER_QUEUE_BUFFER
#define DEFER_QUEUE_BUFFER 1024
#endif
/* *****************************************************************************
spinlock / sync for tasks
***************************************************************************** */
#if defined(__unix__) || defined(__APPLE__) || defined(__linux__)
#define _GNU_SOURCE
#include <time.h>
#endif /* _GNU_SOURCE */
#include <stdlib.h>
/* manage the way threads "wait" for the lock to release */
#if defined(__unix__) || defined(__APPLE__) || defined(__linux__)
/* nanosleep seems to be the most effective and efficient reschedule */
#define defer_nanosleep(length) \
{ \
static const struct timespec tm = {.tv_nsec = length}; \
nanosleep(&tm, NULL); \
}
#define reschedule_thread() defer_nanosleep(1)
#define throttle_thread() defer_nanosleep(8388608UL)
#else /* no effective rescheduling, just spin... */
#define reschedule_thread()
#define throttle_thread()
#endif
/** locks use a single byte */
typedef volatile unsigned char spn_lock_i;
/** The initail value of an unlocked spinlock. */
#define SPN_LOCK_INIT 0
/* Select the correct compiler builtin method. */
#if defined(__has_builtin)
#if __has_builtin(__sync_swap)
#define SPN_LOCK_BUILTIN(...) __sync_swap(__VA_ARGS__)
#elif __has_builtin(__sync_fetch_and_or)
#define SPN_LOCK_BUILTIN(...) __sync_fetch_and_or(__VA_ARGS__)
#else
#error Required builtin "__sync_swap" or "__sync_fetch_and_or" missing from compiler.
#endif /* defined(__has_builtin) */
#elif __GNUC__ > 3
#define SPN_LOCK_BUILTIN(...) __sync_fetch_and_or(__VA_ARGS__)
#else
#error Required builtin "__sync_swap" or "__sync_fetch_and_or" not found.
#endif
/** returns 1 and 0 if the lock was successfully aquired (TRUE == FAIL). */
static inline int spn_trylock(spn_lock_i *lock) {
return SPN_LOCK_BUILTIN(lock, 1);
}
/** Releases a lock. */
static inline __attribute__((unused)) void spn_unlock(spn_lock_i *lock) {
__asm__ volatile("" ::: "memory");
*lock = 0;
}
/** returns a lock's state (non 0 == Busy). */
static inline __attribute__((unused)) int spn_is_locked(spn_lock_i *lock) {
__asm__ volatile("" ::: "memory");
return *lock;
}
/** Busy waits for the lock. */
static inline __attribute__((unused)) void spn_lock(spn_lock_i *lock) {
while (spn_trylock(lock)) {
reschedule_thread();
}
}
/* *****************************************************************************
Data Structures
***************************************************************************** */
typedef struct {
void (*func)(void *);
void *arg;
} task_s;
typedef struct task_node_s {
task_s task;
struct task_node_s *next;
} task_node_s;
static task_node_s tasks_buffer[DEFER_QUEUE_BUFFER];
static struct {
task_node_s *first;
task_node_s **last;
task_node_s *pool;
spn_lock_i lock;
unsigned char initialized;
} deferred = {.first = NULL,
.last = &deferred.first,
.pool = NULL,
.lock = 0,
.initialized = 0};
/* *****************************************************************************
API
***************************************************************************** */
/** Defer an execution of a function for later. */
int defer(void (*func)(void *), void *arg) {
if (!func)
return -1;
task_node_s *task;
spn_lock(&deferred.lock);
if (deferred.pool) {
task = deferred.pool;
deferred.pool = deferred.pool->next;
} else if (deferred.initialized) {
task = malloc(sizeof(task_node_s));
if (!task)
goto error;
} else { /* lazy initialization of task buffer */
deferred.initialized = 1;
task = tasks_buffer;
deferred.pool = tasks_buffer + 1;
for (size_t i = 2; i < DEFER_QUEUE_BUFFER; i++) {
tasks_buffer[i - 1].next = tasks_buffer + i;
}
}
*deferred.last = task;
deferred.last = &task->next;
task->task.func = func;
task->task.arg = arg;
task->next = NULL;
spn_unlock(&deferred.lock);
return 0;
error:
spn_unlock(&deferred.lock);
return -1;
}
/** Performs all deferred functions until the queue had been depleted. */
void defer_perform(void) {
task_node_s *tmp;
task_s task;
restart:
spn_lock(&deferred.lock);
tmp = deferred.first;
if (tmp) {
deferred.first = tmp->next;
if (!deferred.first)
deferred.last = &deferred.first;
task = tmp->task;
if (tmp <= tasks_buffer + (DEFER_QUEUE_BUFFER - 1) && tmp >= tasks_buffer) {
tmp->next = deferred.pool;
deferred.pool = tmp;
} else {
free(tmp);
}
spn_unlock(&deferred.lock);
task.func(task.arg);
goto restart;
} else
spn_unlock(&deferred.lock);
}
/** returns true if there are deferred functions waiting for execution. */
int defer_has_queue(void) { return deferred.first != NULL; }