1 websocket 轻量客户端

因以前发过这个代码,但是一直没有整理,这次整理了一下,持续修改,主要是要使用在arm的linux上,发送接收的数据压缩成图片发送出去。

要达到轻量websocket 使用,必须要达到几个方面才能足够简单, 1、不用加入其他的库 2、只需要使用头文件包含就可以 3、跨平台 如果正常应用,可以使用websocketpp等库,问题就是比较麻烦,要使用boost或者asio库,当然asio也是足够简单,头文件包含,编译通过要设置参数,问题不大,不过不够简单

2 应用场景

1 windows 使用 2 linux使用 3 linux arm 板子上使用 在arm上编译的时候,就不用编译那么多的库文件了

3 原理

使用select模型 和原始操作系统的socket来直接编写代码,select模型比较简单,非长时间阻塞模式,以下是webscoket协议字节示意图 本文函数根据上图实现了websocket协议。定义的主要数据结构如下所示,websocket协议里面包含两种数据,一种是二进制,一种是文本,是可以指定的

struct wsheader_type {

unsigned header_size;

bool fin;

bool mask;

enum opcode_type {

CONTINUATION = 0x0,

TEXT_FRAME = 0x1,

BINARY_FRAME = 0x2,

CLOSE = 8,

PING = 9,

PONG = 0xa,

} opcode;

int N0;

uint64_t N;

uint8_t masking_key[4];

};

websocket链接

websocket链接使用的是http协议,所不同的是必须做upgrade

static const char* desthttp = "GET /%s HTTP/1.1\r\n"

"Host: %s:%d\r\n"

"Upgrade: websocket\r\n"

"Connection: Upgrade\r\n"

"Origin: %s\r\n"

"Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"

"Sec-WebSocket-Version: 13\r\n\r\n";

上面把http协议升级为websocket协议的内容写出,把客户端的内容填充过去发送到服务端就行,从下面代码可以看出我们需要哪些内容

char line[256];

int status;

int i;

sprintf(line, desthttp, path, host, port, origin.c_str());

::send(sockfd, line, (int)strlen(line), 0);

发送和接收

使用select来做异步的模式,发送的时候指定参数 很简单,就如下所示

fd_set wfds;

timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };

FD_ZERO(&wfds);

if (txbuf.size()) { FD_SET(sockfd, &wfds); }

select((int)(sockfd + 1), NULL, &wfds, 0, timeout > 0 ? &tv : 0);

当然,如果要将socket置为非阻塞,开始的时候还是要设置的

#ifdef _WIN32

u_long on = 1;

ioctlsocket(sockfd, FIONBIO, &on);

#else

fcntl(sockfd, F_SETFL, O_NONBLOCK);

#endif

下面是发送的函数,参数为毫秒

//参数是毫秒

void pollSend(int timeout)

{

if (v_state == CLOSED) {

if (timeout > 0) {

timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };

select(0, NULL, NULL, NULL, &tv);

}

return;

}

if (timeout != 0) {

fd_set wfds;

timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };

FD_ZERO(&wfds);

if (txbuf.size()) { FD_SET(sockfd, &wfds); }

select((int)(sockfd + 1), NULL, &wfds, 0, timeout > 0 ? &tv : 0);

}

while (txbuf.size()) {

int ret = ::send(sockfd, (char*)&txbuf[0], (int)txbuf.size(), 0);

if (ret < 0 && (socketerrno == SOCKET_EWOULDBLOCK || socketerrno == SOCKET_EAGAIN_EINPROGRESS)) {

break;

}

else if (ret <= 0) {

closesocket(sockfd);

v_state = CLOSED;

fputs(ret < 0 ? "Connection error!\n" : "Connection closed!\n", stderr);

break;

}

else {

txbuf.erase(txbuf.begin(), txbuf.begin() + ret);

}

}

if (!txbuf.size() && v_state == CLOSING) {

closesocket(sockfd);

v_state = CLOSED;

}

}

下面是接收函数

void pollRecv(int timeout)

{

if (v_state == CLOSED) {

if (timeout > 0) {

timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };

select(0, NULL, NULL, NULL, &tv);

}

return;

}

if (timeout != 0) {

fd_set rfds;

//fd_set wfds;

timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };

FD_ZERO(&rfds);

FD_SET(sockfd, &rfds);

select((int)(sockfd + 1), &rfds, NULL, 0, timeout > 0 ? &tv : 0);

if (!FD_ISSET(sockfd, &rfds))

{

printf("out of here ,no data\n");

return;

}

}

while (true) {

// FD_ISSET(0, &rfds) will be true

int N = (int)rxbuf.size();

ssize_t ret;

//钱波 64K 一个IP包长

rxbuf.resize(N + 64000);

ret = recv(sockfd, (char*)&rxbuf[0] + N, 64000, 0);

if (ret < 0 && (socketerrno == SOCKET_EWOULDBLOCK || socketerrno == SOCKET_EAGAIN_EINPROGRESS)) {

rxbuf.resize(N);

break;

}

else if (ret <= 0) {

rxbuf.resize(N);

closesocket(sockfd);

v_state = CLOSED;

fputs(ret < 0 ? "Connection error!\n" : "Connection closed!\n", stderr);

break;

}

else {//接收到的数据

rxbuf.resize(N + ret);

}

}

}

可以看出,我们使用select 仅仅是不阻塞,简单使用FD_ISSET宏去判决是否有数据达到,如果我们没有收到数据,我们就直接返回。

为了简单使用程序,我们封装一个class来使用接收和发送

class c_ws_class //:public TThreadRunable

{

thread v_thread;

std::mutex v_mutex;

std::condition_variable v_cond;

WebSocket v_ws;

int v_stop = 1;

string v_url;

callback_message_recv v_recv = NULL;

//已经

//bool v_is_working = false;

public:

static int InitSock()

{

#ifdef _WIN32

INT rc;

WSADATA wsaData;

rc = WSAStartup(MAKEWORD(2, 2), &wsaData);

if (rc) {

printf("WSAStartup Failed.\n");

return -1;

}

#endif

return 0;

}

static void UnInitSock()

{

WSACleanup();

}

c_ws_class()

{}

~c_ws_class()

{

v_ws.close();

}

public:

void set_url(const char * url)

{

v_url = url;

}

int connect()

{

if (v_url.empty())

return -1;

return v_ws.connect(v_url);

}

void Start(callback_message_recv recv)

{

//because we will connect all over the time, so v_stop is zero

v_stop = 0;

v_ws.initSize(0, 0);

v_recv = recv;

v_thread = std::thread(std::bind(&c_ws_class::Run, this));

}

bool send(const char * str)

{

if (str != NULL)

{

if (v_ws.getReadyState() != CLOSED)

{

v_ws.send(str);

v_ws.pollSend(10);

return true;

}

return false;

}

return false;

}

void sendBinary(uint8_t *data, int len)

{

if (v_ws.getReadyState() != CLOSED)

{

v_ws.sendBinary(data, len);

v_ws.pollSend(5);

}

}

void Stop()

{

v_stop = 1;

}

int isStop()

{

return v_stop;

}

void Join()

{

if (v_thread.joinable())

v_thread.join();

}

void Run()

{

while (v_stop == 0) {

//WebSocket::pointer wsp = &*ws; // <-- because a unique_ptr cannot be copied into a lambda

if (v_stop == 1)

break;

if (v_ws.getReadyState() == CLOSED)

{

//断线重连

if (connect() != 0)

{

for (int i = 0; i < 20; i++)

{

std::this_thread::sleep_for(std::chrono::milliseconds(100));

if (v_stop == 1)

break;

}

}

}

else

{

v_ws.pollRecv(10);

v_ws.dispatch(v_recv);

}

}

v_ws.close();

//std::cout << "server exit" << endl;

v_stop = 1;

}

void WaitForSignal()

{

std::unique_lock ul(v_mutex);

v_cond.wait(ul);

}

void Notify()

{

v_cond.notify_one();

}

};

以上为封装的外层线程代码,里面同时也封装了断线重连。

我们常常使用python或者使用nodejs来做测试,这里使用nodejs写一个简单的服务器程序,接收到数据以后发回。

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8000 });

wss.on('connection', function connection(ws) {

ws.on('message', function incoming(message) {

console.log('received: %s', message);

ws.send('recv:'+message);

});//当收到消息时,在控制台打印出来,并回复一条信息

});

测试结果

服务端nodejs显示 客户端链接后显示

整个的头文件代码和测试代码在gitee上面 gitee地址

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: