简介

CAP:框架,EventBus、事件总线,发布、订阅消息

RabbitMQ:消息传输器,消息队列,传递消息

参考文档

.NetCore项目中使用使用CAP

1.安装NuGet包

CAP核心包:Nuget DotNetCore.CAPCAP传输器:Nuget DotNetCore.CAP.RabbitMQCAP持久化:DotNetCore.CAP.SqlServerCAP仪表盘:DotNetCore.CAP.Dashboard

2.配置文件capconfig.json

{

"capconfig": {

//数据库连接字符串

"ConnectionString": "server=localhost;database=CapDemo;uid=sa;pwd=1234;MultipleActiveResultSets=true",

//RabbitMQ配置

"RabbitMQ": {

//服务器地址

"HostName": "localhost",

//用户名

"UserName": "admin",

//密码

"Password": "admin", //端口

"Port": 5672,

//虚拟机

"VirtualHost": "/admin",

//交换机

"ExchangeName": "cap.hrapp.topic"

},

//重试的最大次数

"FailedRetryCount": 50,

//每次重试的间隔时间

"FailedRetryInterval": 60,

//删除已经过期消息的时间间隔

"CollectorCleaningInterval": 300

}

}

3.CAP注册服务扩展ServiceCollectionExtensions.cs

public static IServiceCollection AddCap(this IServiceCollection services, CapConfig capConfig)

{

services.AddCap(setup =>

{

// 配置数据库

setup.UseSqlServer(opt =>

{

opt.ConnectionString = capConfig.ConnectionString;

});

// 配置消息队列

setup.UseRabbitMQ(opt =>

{

opt.HostName = capConfig.RabbitMQ.HostName;

opt.UserName = capConfig.RabbitMQ.UserName;

opt.Password = capConfig.RabbitMQ.Password;

opt.VirtualHost = capConfig.RabbitMQ.VirtualHost;

opt.ExchangeName = capConfig.RabbitMQ.ExchangeName;

opt.Port = capConfig.RabbitMQ.Port;

});

// 启用操作界面

setup.UseDashboard(opt =>

{

opt.PathMatch = "/mycap";

});

setup.FailedRetryCount = capConfig.FailedRetryCount;

setup.FailedRetryInterval = capConfig.FailedRetryInterval;

setup.CollectorCleaningInterval = capConfig.CollectorCleaningInterval;

setup.FailedThresholdCallback = (type) =>

{

Log.Logger.Debug($@"A message of type {type} failed after executing {setup.FailedRetryCount} several times,

requiring manual troubleshooting. Message name: {type.Message.GetName()}");

};

setup.JsonSerializerOptions.Encoder = JavaScriptEncoder.Create(UnicodeRanges.All);

}).AddSubscribeFilter();

return services;

}

UseSqlServer:使用SqlServer数据库持久化存储,配置数据库连接字符串

    运行后会在数据库新建两张表,发布消息表cap.Published,订阅消息表cap.Received 查看详情

UseRabbitMQ:使用RabbitMQ消息队列传输消息,查看详情

    HostName:服务器名称     UserName:用户名     Password:密码     VirtualHost :虚拟机     ExchangeName:交换机     Port:端口号

UseDashboard:使用Dashboard仪表盘操作界面查看消息队列信息,地址 http://localhost:5222/mycap,查看详情

FailedRetryCount:重试的最大次数

FailedRetryInterval:每次重试的间隔时间

CollectorCleaningInterval:收集器删除已经过期消息的时间间隔

FailedThresholdCallback:失败重试次数到达上限时触发的回调函数

重试 & 间隔

在默认情况下,重试将在发送和消费消息失败的 4分钟后 开始,这是为了避免设置消息状态延迟导致可能出现的问题。

发送和消费消息的过程中失败会立即重试 3 次,在 3 次以后将进入重试轮询,此时 FailedRetryInterval 配置才会生效。 查看详情查看详情

4.在Program.cs注册CAP服务

builder.Services.AddCap(capConfig);

5.CAP简单应用

(1)注入ICapPublisher

private readonly IFreeSql _freeSql;

private readonly ICapPublisher _capPublisher;

private readonly IBaseRepository _userRepo;

private readonly IBaseRepository _userinfoRepo;

private readonly UnitOfWorkManager _unitOfWorkManager;

public CapDemoService(

IFreeSql freeSql,

ICapPublisher capPublisher,

IBaseRepository userRepo,

IBaseRepository userinfoRepo, UnitOfWorkManager unitOfWorkManager)

{

_freeSql = freeSql;

_capPublisher = capPublisher;

_userRepo = userRepo;

_userinfoRepo = userinfoRepo;

_unitOfWorkManager = unitOfWorkManager;

}

(2)发布消息

public async TaskTestPublish()

{

var res = new OpResult();

await _capPublisher.PublishAsync("TestPublish", "测试消息" + DateTime.Now.ToString());

return res.OK();

}

PublishAsync

参数1:消息名称。

参数2:消息内容

(3)订阅消息

/// 订阅消息

/// 在Controller中,添加[NonAction],添加[CapSubscribe("")] 来订阅相关消息

/// 如果方法没有位于Controller 中,例如Service中,订阅的类需要继承 ICapSubscribe,然后方法添加[CapSubscribe("")]标记

/// Group分组

[NonAction]

[CapSubscribe("TestPublish", Group = "TestReceivedGroup1")]

public void TestReceived(string str)

{

Console.Write(str);

}

CapSubscribe:设置订阅的消息名称

Group:相同组只会有一个订阅消息。不同组都会收到订阅消息。默认组名为程序及名称,可配置默认组名

6.CAP和FreeSql事务,保证应用内数据强一致性

(1)发布消息

/// 存储过程发布消息

public async TaskCapPublisherTransactionTest()

{

var res = new OpResult();

using (IUnitOfWork uow = _unitOfWorkManager.Begin())

{

ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capPublisher, false);

try

{

// 插入用户数据

UserEntity user = new UserEntity

{

LoginName = "Tommy" + DateTime.Now.ToString("yyyMMddhhmmssfff"),

Password = "123456"

};

user = await _userRepo.InsertAsync(user);

await _capPublisher.PublishAsync("Cap.Publisher.Transaction.Test", user, "Cap.CallBack.Test");

trans.Commit(uow);

Console.WriteLine("发布了********************************************");

return res.OK(data: user, msg: "操作成功");

}

catch (Exception ex)

{

Log.Logger.Error($"CapPublisherTransactionTest Exception:{ex.Message}");

return res.NotOK(status: OpResultCode.SystemInnerError, msg: ex.Message);

}

}

}

BeginTransaction重载方法

参数1:_capPublisher

参数2:默认false,是否自动提交。如果true,会在调用发布消息方法PublishAsync时提交事务,后面如果有数据库操作,则不在事务中了。

PublishAsync

参数3:回调方法消息名称查看详情

(2)订阅消息,返回回调方法所需参数

/// 订阅事务发布的消息

[NonAction]

[CapSubscribe("Cap.Publisher.Transaction.Test", Group = "Cap.Received.Test.Group1")]

public UserInfoEntity CapReceivedTest(UserEntity user)

{

var result = _capDemoService.CapReceivedTest(user);

return result;

}

(3)回调方法,同订阅方式基本一样

/// 发布消息回调(补偿事务)

[NonAction]

[CapSubscribe("Cap.CallBack.Test")]

public void CapCallBackTest(UserInfoEntity user)

{

_capDemoService.CapCallBackTest(user);

}

7.Dashboard消息队列仪表盘,http://localhost:5222/mycap

 

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: