废话不说, 直接上代码:
头文件:
#if !defined(_WEBSOCKET_H_)
#define _WEBSOCKET_H_
// 引入 APR 相关头文件
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
typedef struct websocket_t websocket_t;
typedef struct websocket_client_t websocket_client_t;
typedef void (*ws_handler)(websocket_client_t*, const char*, int);
websocket_t* websocket_create(int port);
void websocket_destroy(websocket_t* ws);
int websocket_send(websocket_client_t* client, struct evbuffer* evb);
int websocket_send2(websocket_client_t* client, const char* data, size_t size);
// 需要设置处理函数
void websocket_set_handler(websocket_t* ws, const char* path, ws_handler handler);
#endif //!defined(WEBSOCKET_H_)
C文件:
#include "websocket.h"
#define WS_KEY_NAME "Sec-WebSocket-Key"
#define WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
// opcode.
enum
{
OPCODE_SLICE = 0, ///< 数据分片, 其中一片
OPCODE_TEXT_FRAME = 1, ///< 文本
OPCODE_BINARY_FRAME = 2, ///< 二进制数据
OPCODE_DISCONNECT = 8, ///< 连接断开.
OPCODE_PING = 9,
OPCODE_PONG = 10,
};
//定义websocket 头部操作数 (下面利用了结构体的位域 给每个变量分配空间)
typedef struct websocket_ophdr_t websocket_ophdr_t;
struct websocket_ophdr_t
{ //websocket operator holder 操作符
//注意opcode对应的是高位,fin是低位。(要根据网络字节序来)
unsigned char opcode : 4;
unsigned char rsv3 : 1;
unsigned char rsv2 : 1;
unsigned char rsv1 : 1;
unsigned char fin : 1;
unsigned char pl_len : 7;
unsigned char mask : 1;
};
// 连接的客户端
struct websocket_client_t
{
apr_pool_t* pool;
apr_socket_t* socket;
apr_pollfd_t pollfd;
apr_byte_t handshark;
char* ip;
char* url;
char buffer[4096];
char zero;
int buffer_size;
ws_handler handler;
websocket_client_t* next;
websocket_client_t* prev;
};
// 处理函数映射关系
struct websocket_handler_t
{
const char* pattern;
size_t size;
ws_handler handler;
struct websocket_handler_t* next;
};
// 服务信息
struct websocket_t
{
apr_pool_t* pool;
apr_socket_t* socket;
apr_pollset_t* pollset;
apr_thread_t* thread;
apr_pollfd_t server;
apr_byte_t running;
apr_thread_rwlock_t* lock;
struct websocket_handler_t* handlers;
websocket_client_t* clients;
};
static void websocket_handshark(websocket_t* ws, websocket_client_t* client, const char* buf);
static void websocket_remove(websocket_t* ws, websocket_client_t* client);
static void websocket_receiver(websocket_t* ws, websocket_client_t* client);
static void websocket_accept(websocket_t* ws);
static int websocket_transmission(websocket_client_t* client, char* buf, int len);
static void* APR_THREAD_FUNC websocket_thread(apr_thread_t* thread, void* param);
websocket_t* websocket_create(int port)
{
apr_pool_t* pool;
websocket_t* ws = NULL;
apr_pool_create(&pool, 0);
ws = apr_pcalloc(pool, sizeof(*ws));
ws->pool = pool;
apr_thread_rwlock_create(&ws->lock, pool);
apr_pollset_create_ex(&ws->pollset, 100, pool,
APR_POLLSET_EPOLL | APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY,
APR_POLLSET_EPOLL);
ws->socket = socket_create((apr_port_t)port, pool);
if (NULL == ws->socket)
{
apr_pool_destroy(pool);
return NULL;
}
apr_socket_listen(ws->socket, 5);
ws->server.desc.s = ws->socket;
ws->server.desc_type = APR_POLL_SOCKET;
ws->server.reqevents = APR_POLLIN;
ws->server.client_data = NULL;
apr_pollset_add(ws->pollset, &ws->server);
dzlog_info("websocket listen on port %u", port);
ws->running = TRUE;
apr_thread_create(&ws->thread, NULL, &websocket_thread, ws, pool);
return ws;
}
void websocket_destroy(websocket_t* ws)
{
if (NULL == ws)
return;
dzlog_info("destroy websocket");
if (ws->socket)
apr_socket_close(ws->socket);
if (ws->pollset)
apr_pollset_destroy(ws->pollset);
apr_thread_rwlock_wrlock(ws->lock);
if (ws->clients)
{
websocket_client_t* ptr, * next;
ptr = ws->clients;
while (ptr)
{
apr_socket_close(ptr->socket);
next = ptr->next;
apr_pool_destroy(ptr->pool);
ptr = next;
}
ws->clients = NULL;
}
apr_thread_rwlock_unlock(ws->lock);
apr_pool_destroy(ws->pool);
}
void websocket_set_handler(websocket_t* ws, const char* path, ws_handler handler)
{
struct websocket_handler_t* ptr;
ptr = apr_pcalloc(ws->pool, sizeof(*ptr));
ptr->pattern = apr_pstrdup(ws->pool, path);
ptr->size = strlen(ptr->pattern);
ptr->handler = handler;
apr_thread_rwlock_wrlock(ws->lock);
ptr->next = ws->handlers;
ws->handlers = ptr;
apr_thread_rwlock_unlock(ws->lock);
}
static void websocket_accept(websocket_t* ws)
{
apr_pool_t* pool;
apr_sockaddr_t* addr;
websocket_client_t* client;
apr_pool_create(&pool, 0);
client = apr_pcalloc(pool, sizeof(*client));
client->pool = pool;
client->handshark = FALSE;
if (APR_SUCCESS != apr_socket_accept(&client->socket, ws->socket, pool))
{
dzlog_error("accept failed");
apr_pool_destroy(pool);
return;
}
apr_socket_opt_set(client->socket, APR_SO_NONBLOCK, 1);
apr_socket_opt_set(client->socket, APR_SO_LINGER, 1);
client->pollfd.client_data = client;
client->pollfd.desc.s = client->socket;
client->pollfd.desc_type = APR_POLL_SOCKET;
client->pollfd.reqevents = APR_POLLIN;
apr_thread_rwlock_wrlock(ws->lock);
client->next = ws->clients;
if (ws->clients)
{
ws->clients->prev = client;
}
ws->clients = client;
apr_thread_rwlock_unlock(ws->lock);
if (APR_SUCCESS != apr_pollset_add(ws->pollset, &client->pollfd))
{
websocket_remove(ws, client);
return;
}
if (APR_SUCCESS == apr_socket_addr_get(&addr, APR_REMOTE, client->socket))
{
char buf[64];
if (APR_SUCCESS == apr_sockaddr_ip_getbuf(buf, sizeof(buf), addr))
{
dzlog_info("accept a connection, from: %s", buf);
client->ip = apr_pstrdup(pool, buf);
}
}
if (NULL == client->ip) client->ip = "unknown";
}
static void websocket_handshark(websocket_t* ws, websocket_client_t* client, const char* buf)
{
char data[512];
char sec_data[128] = { 0 };
const char* str, * p;
const char* url;
apr_size_t len;
// 先找到URL
if (0 != strncmp(buf, "GET ", 4))
return;
url = buf + 4;
str = strchr(url, ' ');
if (NULL == str)
return;
//if ((int)(str - url) >= sizeof(client->url))
// return;
//strncpy(client->url, url, (size_t)(str - url));
client->url = apr_pstrndup(client->pool, url, (apr_size_t)(str - url));
str = strstr(buf, WS_KEY_NAME);
if (NULL == str)
return;
str = strchr(str, ':');
if (NULL == str)return;
str++;
while (' ' == *str) str++;
p = strchr(str, '\r');
if (NULL == p) return;
len = (apr_size_t)(p - str);
if (len + (apr_size_t)sizeof(WS_GUID) >= (apr_size_t)sizeof(data))
return;
strncpy(data, str, (size_t)len);
strncpy(data + len, WS_GUID, (size_t)sizeof(WS_GUID));
assert((int)strlen(data) == len + (int)sizeof(WS_GUID) - 1);
//apr_sha1_base64(data, len + (int)sizeof(WS_GUID) - 1, sec_data);
apr_sha1_ctx_t ctx = { 0 };
unsigned char digest[APR_SHA1_DIGESTSIZE] = { 0 };
apr_sha1_init(&ctx);
apr_sha1_update(&ctx, data, (unsigned int)len + (unsigned int)sizeof(WS_GUID) - 1);
apr_sha1_final(digest, &ctx);
apr_base64_encode(sec_data, (const char*)digest, APR_SHA1_DIGESTSIZE);
len = (apr_size_t)snprintf(data, sizeof(data),
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n\r\n",
sec_data);
apr_status_t rv = apr_socket_send(client->socket, data, &len);
if (APR_SUCCESS != rv)
{
apr_strerror(rv, data, sizeof(data));
dzlog_error("failed to send: %s", data);
}
client->handshark = TRUE;
// 匹配url. 设置handler. 如果没有, 则用默认规则.
if (ws->handlers)
{
struct websocket_handler_t* ptr;
apr_thread_rwlock_rdlock(ws->lock);
ptr = ws->handlers;
while (ptr)
{
if (!av_strncasecmp(client->url, ptr->pattern, ptr->size))
{
client->handler = ptr->handler;
break;
}
if (APR_SUCCESS == apr_fnmatch(ptr->pattern, client->url, 0))
{
client->handler = ptr->handler;
break;
}
ptr = ptr->next;
}
apr_thread_rwlock_unlock(ws->lock);
}
dzlog_info("handshark success, from: %s", client->ip);
}
static inline void websocket_umask(uint8_t* payload, int length, uint8_t* mask_key)
{
int i = 0;
for (i = 0; i < length; i++)
{
payload[i] ^= mask_key[i % 4];
}
}
int websocket_send(websocket_client_t* client, struct evbuffer* evb)
{
size_t len = evbuffer_get_length(evb);
const char* data = (const char*)evbuffer_pullup(evb, (ssize_t)len);
return websocket_send2(client, data, len);
}
int websocket_send2(websocket_client_t* client, const char* data, size_t size)
{
// 发送非加密数据.
struct iovec v[2] = { 0 };
unsigned char header[10];
size_t header_len = 0;
apr_size_t bytes;
// 固定的头
header[0] = 0x81;
if (size < 126)
{
header[1] = (uint8_t)size; // 最高bit为0,
header_len = 2;
}
else if (size <= 0xffff)
{
header[1] = 126;
*(uint16_t*)(header + 2) = htons((uint16_t)size);
header_len = 4;
}
else
{
header[1] = 127;
*(uint32_t*)(header + 2) = htonl((uint32_t)size);
header[6] = 0;
header[7] = 0;
header[8] = 0;
header[9] = 0;
header_len = 10;
}
v[0].iov_len = header_len;
v[0].iov_base = header;
v[1].iov_len = size;
v[1].iov_base = (void*)data;
if (APR_SUCCESS != apr_socket_sendv(client->socket, v, 2, &bytes))
return -1;
return (int)(bytes - header_len);
}
static int websocket_transmission(websocket_client_t* client, char* buf, int len)
{
// 收到ws消息.
websocket_ophdr_t* hdr;
uint8_t* payload = NULL;
int plen, hlen;
if (client->buffer_size > 0)
{
if (client->buffer_size + len >= (int)sizeof(client->buffer))
{
// 太长了.
client->buffer_size = 0;
return -1;
}
memcpy(client->buffer + client->buffer_size, buf, (size_t)len);
buf = client->buffer;
len += client->buffer_size;
}
while (len > 0)
{
hdr = (websocket_ophdr_t*)buf;
if (hdr->opcode != OPCODE_TEXT_FRAME)
{
len = 0;
break;
}
if (hdr->pl_len < 126)
{
plen = hdr->pl_len;
hlen = 2;
}
else if (hdr->pl_len == 126)
{
plen = (int)ntohs(*(uint16_t*)(buf + sizeof(websocket_ophdr_t)));
hlen = 4;
}
else
{
plen = (int)ntohl(*(uint32_t*)(buf + sizeof(websocket_ophdr_t)));
hlen = 10;
}
if (hdr->mask)
{
if (plen + hlen + 4 > len)
{
// 长度不够.
break;
}
payload = (uint8_t*)(buf + hlen + 4);
websocket_umask(payload, (int)plen, (uint8_t*)(buf + hlen));
buf += plen + hlen + 4;
len -= (plen + hlen + 4);
}
else
{
assert(plen + hlen == len);
if (plen + hlen > len)
{
break;
}
payload = (uint8_t*)(buf + hlen);
buf += plen + hlen;
len -= (plen + hlen);
}
payload[plen] = 0;
// 进行处理
if (client->handler)
{
client->handler(client, (const char*)payload, plen);
}
}
if (len > 0)
{
memcpy(client->buffer, buf, (size_t)len);
client->buffer_size = len;
}
else
{
client->buffer_size = 0;
}
// client->update_time = apr_time_now();
return 0;
}
static void websocket_remove(websocket_t* ws, websocket_client_t* client)
{
websocket_client_t* next, * prev;
apr_pollset_remove(ws->pollset, &client->pollfd);
apr_thread_rwlock_wrlock(ws->lock);
next = client->next;
prev = client->prev;
if (prev) prev->next = next;
else ws->clients = next;
if (next) next->prev = prev;
apr_thread_rwlock_unlock(ws->lock);
dzlog_info("remove websocket, from:%s", client->ip);
apr_socket_close(client->socket);
apr_pool_destroy(client->pool);
}
static void websocket_receiver(websocket_t* ws, websocket_client_t* client)
{
char buf[4096] = { 0 };
apr_size_t len = sizeof(buf);
if (APR_SUCCESS != apr_socket_recv(client->socket, buf, &len))
{
// 异常, 那么断开.
websocket_remove(ws, client);
return;
}
if (len > 0)
{
buf[len] = '\0';
if (client->handshark)
{
websocket_transmission(client, buf, (int)len);
}
else
{
websocket_handshark(ws, client, buf);
}
}
}
//static void websocket_check_timeout(websocket_t* ws)
//{
// websocket_client_t* ptr, * next, * prev;
// apr_time_t now = apr_time_now();
//
// if (now - ws->check_time < 60 * 1000 * 1000L)
// return;
//
// ws->check_time = now;
//
// apr_thread_rwlock_wrlock(ws->lock);
//
// ptr = ws->clients;
// while (ptr)
// {
// if (now - ptr->update_time >= 10 * 1000 * 1000L)
// {
// // 超过10秒无消息, 则释放
//
// apr_pollset_remove(ws->pollset, &ptr->pollfd);
//
// next = ptr->next;
// prev = ptr->prev;
// if (prev) prev->next = next;
// else ws->clients = next;
// if (next) next->prev = prev;
//
// dzlog_warn("websocket timeout, from:%s", ptr->ip);
//
// apr_socket_close(ptr->socket);
// apr_pool_destroy(ptr->pool);
//
// ptr = next;
// }
// else
// {
// ptr = ptr->next;
// }
// }
// apr_thread_rwlock_unlock(ws->lock);
//
//}
static void* APR_THREAD_FUNC websocket_thread(apr_thread_t* thread, void* param)
{
// 不追求极致性能, 使用单个线程处理所有websocket请求.
websocket_t* ws = param;
apr_int32_t i, num;
const apr_pollfd_t* descriptors;
apr_thread_detach(thread);
#if defined(_DEBUG)
apr_thread_rwlock_wrlock(ws->lock);
dzlog_debug("start websocket ");
apr_thread_rwlock_unlock(ws->lock);
#endif
while (ws->running)
{
if (APR_SUCCESS == apr_pollset_poll(ws->pollset, 100 * 1000L, &num, &descriptors))
{
for (i = num - 1; i >= 0; i--)
{
if (descriptors[i].client_data)
{
// 客户端.
websocket_receiver(ws, (websocket_client_t*)descriptors[i].client_data);
}
else
{
// 服务端
websocket_accept(ws);
}
}
}
//else
//{
// // 每分钟检查一次, 长时间无消息的删除掉.
// websocket_check_timeout(ws);
//}
}
return NULL;
}
使用
#include "websocket.h"
ws = websocket_create(80);
websocket_set_handler(ws, "/api", &websocket_handler);
static void websocket_handler(websocket_client_t* client, const char* body, int size)
{
cJSON * root = cJSON_Parse(body);
// 处理WebSocket的请求
...
// 返回数据
websocket_send2(client, data, size);
}
参考链接
发表评论