问题

最近业务上面需要使用到WebSocket长连接来解决某些业务场景。

一图胜千言

注意:这里承担WebSocket服务器的是AWS API Gateway;后面的EC2业务服务,其实都是REST接口服务。 这里主要关注API Gateway和REST业务服务怎么实现API Gateway要求的WebSocket协议。VPCLink,ELB和Auto Scaling不是我们的重点。这里假设这些都已经完成了。

初始化网关

选择【构建】,如下图: 设置网关名称,类似如下图: 全部选择3种路由,如下图: 集成方式暂时选择mock方式: 阶段,随便填个名称: 预览一下,即将创建的空websocket网关:

网关事件

初始化完成部署后,进入网关会出现如下图:

路由选择表达式

$request.body.action:这个表示客户端与网关通过ws方式连接上后,可以向网关发送json数据来触发网关来进行路由调用。这里的action就是被选中路由资源的关键字段而已。类似发送这样:

{

"action": "getList"

}

接下来介绍默认自带的三种路由: $connect:表示客户端通过ws方式与网关进行了连接; $disconnect:表示ws连接中断; $default:如果客户端发送过来的消息不是json方式,默认就这这个路由。 一般来说,前两个都需要集成后台业务程序进行相关处理;最后一个默认根据业务情况来定,我这里不需要业务程序处理。 接下来介绍前2种路由的集成配置。

$connect路由事件

上面是该事件的主体配置。接下来我们先看集成请求是怎么配置的?

集成请求

这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下: 模板选择表达式:\$default 模板密钥名称:$default 生成模板内容:

{

"connectionId": "$context.connectionId",

"payload": $input.body

}

这样后台业务程序,就能够读取到connectionId和payload,这两个字段。业务程序类似如下:

@PostMapping("/connect")

public R connect(@RequestBody WebsocketConn websocketConn) {

// 记录connectionId

String connectionId = websocketConn.getConnectionId();

log.info(String.format("WebSocket ID: %s 上线", connectionId));

redisRepository.setExpire(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId),

connectionId, TokenConstant.SESSTION_API_TOKEN_EXPIRE);

return R.status(true);

}

这段Java代码主要就是实现connect接口,将aws api gateway的ws连接ID保存到redis里面,然后,返回200给api gateway。

集成响应

这里集成响应配置,主要就是添加响应$default。

$disconnect路由事件

上图就是disconnect路由事件的主要配置,只要配置一个集成请求就完事了。

集成请求

集成请求事件如上图。 这里选择vpclink方式对后台业务程序进行集成。这里值得关注的是请求集成模板参数如下: 模板选择表达式:\$default 模板密钥名称:$default 生成模板内容:

{

"connectionId": "$context.connectionId"

}

这样后台业务程序,就能够读取到connectionId,这两个字段。业务程序类似如下:

@PostMapping("/disconnect")

public R disconnect(@RequestBody WebsocketConn websocketConn) {

// 移除connectionId

String connectionId = websocketConn.getConnectionId();

log.info(String.format("WebSocket ID: %s 下线", connectionId));

redisRepository.del(String.format("%s:%s", TokenConstant.WEBSOCKET_API_ID_PREFIX, connectionId));

return R.status(true);

}

这段Java代码主要就是实现disconnect接口,将aws api gateway的ws连接时保存的连接ID从redis里面移除,然后,返回200给api gateway。

$default路由事件

以上是default路由配置事件。

集成请求

这里选择mock方式,不需要与后台业务程序集成。这里值得关注的是请求集成模板参数如下: 模板选择表达式:\$default 模板密钥名称:$default 生成模板内容:

{

"statusCode": 200

}

部署网关

将API Gateway部署到阶段里面去。

查看阶段

从阶段可以看到两种URL,第一种就是通过ws协议进行连接到URL,第二种就是操作这个API Gateway的接口。

推送消息(Java SDK)

Maven

com.amazonaws

aws-java-sdk-core

1.12.348

com.amazonaws

aws-java-sdk-apigatewaymanagementapi

1.12.348

ApiGatewayConfig.java(Spring配置类)

package org.xxxx.config;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;

import com.amazonaws.client.builder.AwsClientBuilder;

import com.amazonaws.regions.Region;

import com.amazonaws.regions.Regions;

import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsync;

import com.amazonaws.services.apigatewaymanagementapi.AmazonApiGatewayManagementApiAsyncClientBuilder;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ApiGatewayConfig {

@Value("${api.gateway.endpoint}")

private String endpoint;

@Value("${region.name:cn-north-1}")

private String regionName;

@Bean

public AmazonApiGatewayManagementApiAsync amazonApiGatewayManagementApi() {

Region region = Regions.getCurrentRegion();

if (region != null){

regionName = region.getName();

}

AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, regionName);

return AmazonApiGatewayManagementApiAsyncClientBuilder.standard()

.withEndpointConfiguration(endpointConfiguration)

.withCredentials(DefaultAWSCredentialsProviderChain.getInstance())

.build();

}

}

注意:这里的EC2运行角色需要设置api gateway调用权限类似:AmazonAPIGatewayInvokeFullAccess,也可以根据情况进行自定义权限设置。

推送消息

@Override

public void pushAllFromMessage(String message) {

Set keys = redisRepository.getListKey(TokenConstant.WEBSOCKET_API_ID_PREFIX);

if (!CollectionUtils.isEmpty(keys)){

for (String key : keys) {

String connectionId = (String) redisRepository.get(key);

PostToConnectionRequest postToConnectionRequest = new PostToConnectionRequest();

postToConnectionRequest.withConnectionId(connectionId);

ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());

byteBuffer = byteBuffer.duplicate();

postToConnectionRequest.withData(byteBuffer);

amazonApiGatewayManagementApi.postToConnectionAsync(postToConnectionRequest);

}

}

}

这里主要使用postToConnectionAsync方法进行消息推送,注意这里的ByteBuffer的使用,是需要调用duplicate方法的。

测试

安装wscat客户端:

npm install -g wscat

使用wscat连接上去,即可接收消息推送了。

wscat -c wss://aabbccddee.execute-api.cn-north-1.amazonaws.com.cn/test

总结

到这里就完成了AWS API Gateway中使用websocket的功能。注意,我们的业务程序不要集成任何websocket库,只用正常的restful方式实现aws要求的那个几个路由事件接口即可。再集成api gateway相关的sdk就可以对websocket连接进行管理了。

参考:

AWS API Gateway - WebSocket API + EC2 (HTTP & VPC Link & Auth & API Keys & Lambda Authorizer)awsapigw-ws-springbootAmazonApiGatewayManagementApi使用 IAM 授权使用 wscat 连接到 WebSocket API 并向其发送消息

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: