// 大名科技(天津)有限公司版权所有 电话:18020030720 QQ:515096995
//
// 此源代码遵循位于源代码树根目录中的 LICENSE 文件的许可证
namespace Admin.NET.Core;
///
/// Redis 消息扩展
///
///
public class EventConsumer : IDisposable
{
private Task _consumerTask;
private CancellationTokenSource _consumerCts;
///
/// 消费者
///
public IProducerConsumer Consumer { get; }
///
/// ConsumerBuilder
///
public FullRedis Builder { get; set; }
///
/// 消息回调
///
public event EventHandler Received;
///
/// 构造函数
///
public EventConsumer(FullRedis redis, string routeKey)
{
Builder = redis;
Consumer = Builder.GetQueue(routeKey);
}
///
/// 启动
///
///
public void Start()
{
if (Consumer is null)
{
throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
}
if (_consumerTask != null)
{
return;
}
_consumerCts = new CancellationTokenSource();
var ct = _consumerCts.Token;
_consumerTask = Task.Factory.StartNew(() =>
{
while (!ct.IsCancellationRequested)
{
var cr = Consumer.TakeOne(10);
if (cr == null) continue;
Received?.Invoke(this, cr);
}
}, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
///
/// 停止
///
///
public async Task Stop()
{
if (_consumerCts == null || _consumerTask == null) return;
_consumerCts.Cancel();
try
{
await _consumerTask;
}
finally
{
_consumerTask = null;
_consumerCts = null;
}
}
///
/// 释放
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// 释放
///
///
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (_consumerTask != null)
{
Stop().Wait();
}
Builder.Dispose();
}
}
}