ExamSampleWorker.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. using Microsoft.Extensions.DependencyInjection;
  2. using Microsoft.Extensions.Hosting;
  3. using Microsoft.Extensions.Logging;
  4. namespace YBEE.EQM.Application;
  5. public class ExamSampleWorker(ILogger<ExamSampleWorker> logger, IServiceScopeFactory scopeFactory) : IHostedService, IDisposable
  6. {
  7. private readonly Dictionary<int, Task> _executingTasks = [];
  8. private readonly Dictionary<int, CancellationTokenSource> _cancellationTokenSources = [];
  9. /// <summary>
  10. /// StartAsync 方法不再直接启动任务,而是保持服务初始化
  11. /// </summary>
  12. /// <param name="cancellationToken"></param>
  13. /// <returns></returns>
  14. public Task StartAsync(CancellationToken cancellationToken)
  15. {
  16. logger.LogInformation("抽样执行服务已初始化");
  17. return Task.CompletedTask;
  18. }
  19. /// <summary>
  20. /// 外部控制:通过参数启动后台任务
  21. /// </summary>
  22. /// <param name="examPlanId">监测计划ID</param>
  23. /// <returns></returns>
  24. public int StartTask(int examPlanId)
  25. {
  26. if (_executingTasks.TryGetValue(examPlanId, out Task value) && !value.IsCompleted)
  27. {
  28. logger.LogInformation("监测计划ID:{examPlanId},执行中,不能重复启动!", examPlanId);
  29. return -1;
  30. }
  31. logger.LogInformation("监测计划ID:{examPlanId},开始执行...", examPlanId);
  32. var cancellationTokenSource = new CancellationTokenSource();
  33. var executingTask = ExecuteAsync(examPlanId, cancellationTokenSource.Token);
  34. // 保存任务和取消令牌源,以便可以后续控制
  35. _executingTasks[examPlanId] = executingTask;
  36. _cancellationTokenSources[examPlanId] = cancellationTokenSource;
  37. return 0;
  38. }
  39. /// <summary>
  40. /// 外部控制:通过任务参数停止任务
  41. /// </summary>
  42. /// <param name="examPlanId"></param>
  43. /// <returns></returns>
  44. public int StopTask(int examPlanId)
  45. {
  46. if (!_executingTasks.TryGetValue(examPlanId, out Task value) || value.IsCompleted)
  47. {
  48. logger.LogInformation("监测计划ID:{examPlanId},未运行!", examPlanId);
  49. return -1;
  50. }
  51. logger.LogInformation("监测计划ID:{examPlanId},开始停止...", examPlanId);
  52. // 停止任务
  53. _cancellationTokenSources[examPlanId]?.Cancel();
  54. _executingTasks.Remove(examPlanId);
  55. _cancellationTokenSources.Remove(examPlanId);
  56. return 0;
  57. }
  58. /// <summary>
  59. /// 任务执行逻辑
  60. /// </summary>
  61. /// <param name="examPlanId"></param>
  62. /// <param name="cancellationToken"></param>
  63. /// <returns></returns>
  64. private async Task ExecuteAsync(int examPlanId, CancellationToken cancellationToken)
  65. {
  66. try
  67. {
  68. while (!cancellationToken.IsCancellationRequested)
  69. {
  70. using var scope = scopeFactory.CreateScope();
  71. var services = scope.ServiceProvider;
  72. var examSampleService = services.GetService<IExamSampleService>();
  73. await examSampleService.ExecuteSampleByExamPlanId(examPlanId);
  74. logger.LogInformation("监测计划ID:{examPlanId},执行完成。", examPlanId);
  75. StopTask(examPlanId);
  76. }
  77. }
  78. catch (TaskCanceledException)
  79. {
  80. logger.LogInformation("监测计划ID:{examPlanId},已取消。", examPlanId);
  81. }
  82. }
  83. public Task StopAsync(CancellationToken cancellationToken)
  84. {
  85. // 停止所有正在运行的任务
  86. foreach (var key in _executingTasks.Keys.ToList())
  87. {
  88. StopTask(key);
  89. }
  90. return Task.CompletedTask;
  91. }
  92. public void Dispose()
  93. {
  94. foreach (var cancellationTokenSource in _cancellationTokenSources.Values)
  95. {
  96. cancellationTokenSource?.Dispose();
  97. }
  98. }
  99. }