废话不说, 直接上代码:

头文件:

#if !defined(_WEBSOCKET_H_)

#define _WEBSOCKET_H_

// 引入 APR 相关头文件

#include

#include

#include

#include

#include

#include // 线程支持

#include // 锁

#include // 读写锁

#include // 线程池

#include // 条件

#include // 时间

#include // 网络

#include // 端口

#include // 内存池

#include // 信号

#include // 哈希表

#include // 字符串

#include // 队列

#include // 字符串匹配

#include

#include

#include

#include

#include

#include

#include

typedef struct websocket_t websocket_t;

typedef struct websocket_client_t websocket_client_t;

typedef void (*ws_handler)(websocket_client_t*, const char*, int);

websocket_t* websocket_create(int port);

void websocket_destroy(websocket_t* ws);

int websocket_send(websocket_client_t* client, struct evbuffer* evb);

int websocket_send2(websocket_client_t* client, const char* data, size_t size);

// 需要设置处理函数

void websocket_set_handler(websocket_t* ws, const char* path, ws_handler handler);

#endif //!defined(WEBSOCKET_H_)

C文件:

#include "websocket.h"

#define WS_KEY_NAME "Sec-WebSocket-Key"

#define WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

// opcode.

enum

{

OPCODE_SLICE = 0, ///< 数据分片, 其中一片

OPCODE_TEXT_FRAME = 1, ///< 文本

OPCODE_BINARY_FRAME = 2, ///< 二进制数据

OPCODE_DISCONNECT = 8, ///< 连接断开.

OPCODE_PING = 9,

OPCODE_PONG = 10,

};

//定义websocket 头部操作数 (下面利用了结构体的位域 给每个变量分配空间)

typedef struct websocket_ophdr_t websocket_ophdr_t;

struct websocket_ophdr_t

{ //websocket operator holder 操作符

//注意opcode对应的是高位,fin是低位。(要根据网络字节序来)

unsigned char opcode : 4;

unsigned char rsv3 : 1;

unsigned char rsv2 : 1;

unsigned char rsv1 : 1;

unsigned char fin : 1;

unsigned char pl_len : 7;

unsigned char mask : 1;

};

// 连接的客户端

struct websocket_client_t

{

apr_pool_t* pool;

apr_socket_t* socket;

apr_pollfd_t pollfd;

apr_byte_t handshark;

char* ip;

char* url;

char buffer[4096];

char zero;

int buffer_size;

ws_handler handler;

websocket_client_t* next;

websocket_client_t* prev;

};

// 处理函数映射关系

struct websocket_handler_t

{

const char* pattern;

size_t size;

ws_handler handler;

struct websocket_handler_t* next;

};

// 服务信息

struct websocket_t

{

apr_pool_t* pool;

apr_socket_t* socket;

apr_pollset_t* pollset;

apr_thread_t* thread;

apr_pollfd_t server;

apr_byte_t running;

apr_thread_rwlock_t* lock;

struct websocket_handler_t* handlers;

websocket_client_t* clients;

};

static void websocket_handshark(websocket_t* ws, websocket_client_t* client, const char* buf);

static void websocket_remove(websocket_t* ws, websocket_client_t* client);

static void websocket_receiver(websocket_t* ws, websocket_client_t* client);

static void websocket_accept(websocket_t* ws);

static int websocket_transmission(websocket_client_t* client, char* buf, int len);

static void* APR_THREAD_FUNC websocket_thread(apr_thread_t* thread, void* param);

websocket_t* websocket_create(int port)

{

apr_pool_t* pool;

websocket_t* ws = NULL;

apr_pool_create(&pool, 0);

ws = apr_pcalloc(pool, sizeof(*ws));

ws->pool = pool;

apr_thread_rwlock_create(&ws->lock, pool);

apr_pollset_create_ex(&ws->pollset, 100, pool,

APR_POLLSET_EPOLL | APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY,

APR_POLLSET_EPOLL);

ws->socket = socket_create((apr_port_t)port, pool);

if (NULL == ws->socket)

{

apr_pool_destroy(pool);

return NULL;

}

apr_socket_listen(ws->socket, 5);

ws->server.desc.s = ws->socket;

ws->server.desc_type = APR_POLL_SOCKET;

ws->server.reqevents = APR_POLLIN;

ws->server.client_data = NULL;

apr_pollset_add(ws->pollset, &ws->server);

dzlog_info("websocket listen on port %u", port);

ws->running = TRUE;

apr_thread_create(&ws->thread, NULL, &websocket_thread, ws, pool);

return ws;

}

void websocket_destroy(websocket_t* ws)

{

if (NULL == ws)

return;

dzlog_info("destroy websocket");

if (ws->socket)

apr_socket_close(ws->socket);

if (ws->pollset)

apr_pollset_destroy(ws->pollset);

apr_thread_rwlock_wrlock(ws->lock);

if (ws->clients)

{

websocket_client_t* ptr, * next;

ptr = ws->clients;

while (ptr)

{

apr_socket_close(ptr->socket);

next = ptr->next;

apr_pool_destroy(ptr->pool);

ptr = next;

}

ws->clients = NULL;

}

apr_thread_rwlock_unlock(ws->lock);

apr_pool_destroy(ws->pool);

}

void websocket_set_handler(websocket_t* ws, const char* path, ws_handler handler)

{

struct websocket_handler_t* ptr;

ptr = apr_pcalloc(ws->pool, sizeof(*ptr));

ptr->pattern = apr_pstrdup(ws->pool, path);

ptr->size = strlen(ptr->pattern);

ptr->handler = handler;

apr_thread_rwlock_wrlock(ws->lock);

ptr->next = ws->handlers;

ws->handlers = ptr;

apr_thread_rwlock_unlock(ws->lock);

}

static void websocket_accept(websocket_t* ws)

{

apr_pool_t* pool;

apr_sockaddr_t* addr;

websocket_client_t* client;

apr_pool_create(&pool, 0);

client = apr_pcalloc(pool, sizeof(*client));

client->pool = pool;

client->handshark = FALSE;

if (APR_SUCCESS != apr_socket_accept(&client->socket, ws->socket, pool))

{

dzlog_error("accept failed");

apr_pool_destroy(pool);

return;

}

apr_socket_opt_set(client->socket, APR_SO_NONBLOCK, 1);

apr_socket_opt_set(client->socket, APR_SO_LINGER, 1);

client->pollfd.client_data = client;

client->pollfd.desc.s = client->socket;

client->pollfd.desc_type = APR_POLL_SOCKET;

client->pollfd.reqevents = APR_POLLIN;

apr_thread_rwlock_wrlock(ws->lock);

client->next = ws->clients;

if (ws->clients)

{

ws->clients->prev = client;

}

ws->clients = client;

apr_thread_rwlock_unlock(ws->lock);

if (APR_SUCCESS != apr_pollset_add(ws->pollset, &client->pollfd))

{

websocket_remove(ws, client);

return;

}

if (APR_SUCCESS == apr_socket_addr_get(&addr, APR_REMOTE, client->socket))

{

char buf[64];

if (APR_SUCCESS == apr_sockaddr_ip_getbuf(buf, sizeof(buf), addr))

{

dzlog_info("accept a connection, from: %s", buf);

client->ip = apr_pstrdup(pool, buf);

}

}

if (NULL == client->ip) client->ip = "unknown";

}

static void websocket_handshark(websocket_t* ws, websocket_client_t* client, const char* buf)

{

char data[512];

char sec_data[128] = { 0 };

const char* str, * p;

const char* url;

apr_size_t len;

// 先找到URL

if (0 != strncmp(buf, "GET ", 4))

return;

url = buf + 4;

str = strchr(url, ' ');

if (NULL == str)

return;

//if ((int)(str - url) >= sizeof(client->url))

// return;

//strncpy(client->url, url, (size_t)(str - url));

client->url = apr_pstrndup(client->pool, url, (apr_size_t)(str - url));

str = strstr(buf, WS_KEY_NAME);

if (NULL == str)

return;

str = strchr(str, ':');

if (NULL == str)return;

str++;

while (' ' == *str) str++;

p = strchr(str, '\r');

if (NULL == p) return;

len = (apr_size_t)(p - str);

if (len + (apr_size_t)sizeof(WS_GUID) >= (apr_size_t)sizeof(data))

return;

strncpy(data, str, (size_t)len);

strncpy(data + len, WS_GUID, (size_t)sizeof(WS_GUID));

assert((int)strlen(data) == len + (int)sizeof(WS_GUID) - 1);

//apr_sha1_base64(data, len + (int)sizeof(WS_GUID) - 1, sec_data);

apr_sha1_ctx_t ctx = { 0 };

unsigned char digest[APR_SHA1_DIGESTSIZE] = { 0 };

apr_sha1_init(&ctx);

apr_sha1_update(&ctx, data, (unsigned int)len + (unsigned int)sizeof(WS_GUID) - 1);

apr_sha1_final(digest, &ctx);

apr_base64_encode(sec_data, (const char*)digest, APR_SHA1_DIGESTSIZE);

len = (apr_size_t)snprintf(data, sizeof(data),

"HTTP/1.1 101 Switching Protocols\r\n"

"Upgrade: websocket\r\n"

"Connection: Upgrade\r\n"

"Sec-WebSocket-Accept: %s\r\n\r\n",

sec_data);

apr_status_t rv = apr_socket_send(client->socket, data, &len);

if (APR_SUCCESS != rv)

{

apr_strerror(rv, data, sizeof(data));

dzlog_error("failed to send: %s", data);

}

client->handshark = TRUE;

// 匹配url. 设置handler. 如果没有, 则用默认规则.

if (ws->handlers)

{

struct websocket_handler_t* ptr;

apr_thread_rwlock_rdlock(ws->lock);

ptr = ws->handlers;

while (ptr)

{

if (!av_strncasecmp(client->url, ptr->pattern, ptr->size))

{

client->handler = ptr->handler;

break;

}

if (APR_SUCCESS == apr_fnmatch(ptr->pattern, client->url, 0))

{

client->handler = ptr->handler;

break;

}

ptr = ptr->next;

}

apr_thread_rwlock_unlock(ws->lock);

}

dzlog_info("handshark success, from: %s", client->ip);

}

static inline void websocket_umask(uint8_t* payload, int length, uint8_t* mask_key)

{

int i = 0;

for (i = 0; i < length; i++)

{

payload[i] ^= mask_key[i % 4];

}

}

int websocket_send(websocket_client_t* client, struct evbuffer* evb)

{

size_t len = evbuffer_get_length(evb);

const char* data = (const char*)evbuffer_pullup(evb, (ssize_t)len);

return websocket_send2(client, data, len);

}

int websocket_send2(websocket_client_t* client, const char* data, size_t size)

{

// 发送非加密数据.

struct iovec v[2] = { 0 };

unsigned char header[10];

size_t header_len = 0;

apr_size_t bytes;

// 固定的头

header[0] = 0x81;

if (size < 126)

{

header[1] = (uint8_t)size; // 最高bit为0,

header_len = 2;

}

else if (size <= 0xffff)

{

header[1] = 126;

*(uint16_t*)(header + 2) = htons((uint16_t)size);

header_len = 4;

}

else

{

header[1] = 127;

*(uint32_t*)(header + 2) = htonl((uint32_t)size);

header[6] = 0;

header[7] = 0;

header[8] = 0;

header[9] = 0;

header_len = 10;

}

v[0].iov_len = header_len;

v[0].iov_base = header;

v[1].iov_len = size;

v[1].iov_base = (void*)data;

if (APR_SUCCESS != apr_socket_sendv(client->socket, v, 2, &bytes))

return -1;

return (int)(bytes - header_len);

}

static int websocket_transmission(websocket_client_t* client, char* buf, int len)

{

// 收到ws消息.

websocket_ophdr_t* hdr;

uint8_t* payload = NULL;

int plen, hlen;

if (client->buffer_size > 0)

{

if (client->buffer_size + len >= (int)sizeof(client->buffer))

{

// 太长了.

client->buffer_size = 0;

return -1;

}

memcpy(client->buffer + client->buffer_size, buf, (size_t)len);

buf = client->buffer;

len += client->buffer_size;

}

while (len > 0)

{

hdr = (websocket_ophdr_t*)buf;

if (hdr->opcode != OPCODE_TEXT_FRAME)

{

len = 0;

break;

}

if (hdr->pl_len < 126)

{

plen = hdr->pl_len;

hlen = 2;

}

else if (hdr->pl_len == 126)

{

plen = (int)ntohs(*(uint16_t*)(buf + sizeof(websocket_ophdr_t)));

hlen = 4;

}

else

{

plen = (int)ntohl(*(uint32_t*)(buf + sizeof(websocket_ophdr_t)));

hlen = 10;

}

if (hdr->mask)

{

if (plen + hlen + 4 > len)

{

// 长度不够.

break;

}

payload = (uint8_t*)(buf + hlen + 4);

websocket_umask(payload, (int)plen, (uint8_t*)(buf + hlen));

buf += plen + hlen + 4;

len -= (plen + hlen + 4);

}

else

{

assert(plen + hlen == len);

if (plen + hlen > len)

{

break;

}

payload = (uint8_t*)(buf + hlen);

buf += plen + hlen;

len -= (plen + hlen);

}

payload[plen] = 0;

// 进行处理

if (client->handler)

{

client->handler(client, (const char*)payload, plen);

}

}

if (len > 0)

{

memcpy(client->buffer, buf, (size_t)len);

client->buffer_size = len;

}

else

{

client->buffer_size = 0;

}

// client->update_time = apr_time_now();

return 0;

}

static void websocket_remove(websocket_t* ws, websocket_client_t* client)

{

websocket_client_t* next, * prev;

apr_pollset_remove(ws->pollset, &client->pollfd);

apr_thread_rwlock_wrlock(ws->lock);

next = client->next;

prev = client->prev;

if (prev) prev->next = next;

else ws->clients = next;

if (next) next->prev = prev;

apr_thread_rwlock_unlock(ws->lock);

dzlog_info("remove websocket, from:%s", client->ip);

apr_socket_close(client->socket);

apr_pool_destroy(client->pool);

}

static void websocket_receiver(websocket_t* ws, websocket_client_t* client)

{

char buf[4096] = { 0 };

apr_size_t len = sizeof(buf);

if (APR_SUCCESS != apr_socket_recv(client->socket, buf, &len))

{

// 异常, 那么断开.

websocket_remove(ws, client);

return;

}

if (len > 0)

{

buf[len] = '\0';

if (client->handshark)

{

websocket_transmission(client, buf, (int)len);

}

else

{

websocket_handshark(ws, client, buf);

}

}

}

//static void websocket_check_timeout(websocket_t* ws)

//{

// websocket_client_t* ptr, * next, * prev;

// apr_time_t now = apr_time_now();

//

// if (now - ws->check_time < 60 * 1000 * 1000L)

// return;

//

// ws->check_time = now;

//

// apr_thread_rwlock_wrlock(ws->lock);

//

// ptr = ws->clients;

// while (ptr)

// {

// if (now - ptr->update_time >= 10 * 1000 * 1000L)

// {

// // 超过10秒无消息, 则释放

//

// apr_pollset_remove(ws->pollset, &ptr->pollfd);

//

// next = ptr->next;

// prev = ptr->prev;

// if (prev) prev->next = next;

// else ws->clients = next;

// if (next) next->prev = prev;

//

// dzlog_warn("websocket timeout, from:%s", ptr->ip);

//

// apr_socket_close(ptr->socket);

// apr_pool_destroy(ptr->pool);

//

// ptr = next;

// }

// else

// {

// ptr = ptr->next;

// }

// }

// apr_thread_rwlock_unlock(ws->lock);

//

//}

static void* APR_THREAD_FUNC websocket_thread(apr_thread_t* thread, void* param)

{

// 不追求极致性能, 使用单个线程处理所有websocket请求.

websocket_t* ws = param;

apr_int32_t i, num;

const apr_pollfd_t* descriptors;

apr_thread_detach(thread);

#if defined(_DEBUG)

apr_thread_rwlock_wrlock(ws->lock);

dzlog_debug("start websocket ");

apr_thread_rwlock_unlock(ws->lock);

#endif

while (ws->running)

{

if (APR_SUCCESS == apr_pollset_poll(ws->pollset, 100 * 1000L, &num, &descriptors))

{

for (i = num - 1; i >= 0; i--)

{

if (descriptors[i].client_data)

{

// 客户端.

websocket_receiver(ws, (websocket_client_t*)descriptors[i].client_data);

}

else

{

// 服务端

websocket_accept(ws);

}

}

}

//else

//{

// // 每分钟检查一次, 长时间无消息的删除掉.

// websocket_check_timeout(ws);

//}

}

return NULL;

}

使用

#include "websocket.h"

ws = websocket_create(80);

websocket_set_handler(ws, "/api", &websocket_handler);

static void websocket_handler(websocket_client_t* client, const char* body, int size)

{

cJSON * root = cJSON_Parse(body);

// 处理WebSocket的请求

...

// 返回数据

websocket_send2(client, data, size);

}

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: