SubjectPushService.cs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. using IES.ExamServer.Helpers;
  2. using IES.ExamServer.Models;
  3. using LiteDB;
  4. using System;
  5. using System.Net.Http;
  6. using System.Text;
  7. using System.Threading.Channels;
  8. namespace IES.ExamServer.DI
  9. {
  10. public class SubjectPushService: BackgroundService
  11. {
  12. private readonly DataQueue _dataQueue;
  13. private readonly IHttpClientFactory _httpClientFactory;
  14. private readonly LiteDBFactory _liteDBFactory;
  15. private readonly CenterServiceConnectionService _connectionService;
  16. private readonly IConfiguration _configuration;
  17. public SubjectPushService(DataQueue dataQueue, IHttpClientFactory httpClientFactory, LiteDBFactory liteDBFactory, CenterServiceConnectionService connectionService, IConfiguration configuration)
  18. {
  19. _dataQueue = dataQueue;
  20. _httpClientFactory = httpClientFactory;
  21. _liteDBFactory = liteDBFactory;
  22. _connectionService = connectionService;
  23. _configuration= configuration;
  24. }
  25. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  26. {
  27. ///数据中心链接了,开始推送数据
  28. if (_connectionService.centerIsConnected)
  29. {
  30. // 启动时加载未推送的数据
  31. //await LoadUnpushedDataAsync(stoppingToken);
  32. }
  33. while (!stoppingToken.IsCancellationRequested)
  34. {
  35. // 从 Channel 中读取数据
  36. if (await _dataQueue.Reader.WaitToReadAsync(stoppingToken))
  37. {
  38. while (_dataQueue.Reader.TryRead(out var data))
  39. {
  40. await PushSubjectResultDataToCloudAsync(data, stoppingToken);
  41. }
  42. }
  43. }
  44. }
  45. private async Task PushSubjectResultDataToCloudAsync((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)data, CancellationToken cancellationToken)
  46. {
  47. if (_connectionService.centerIsConnected)
  48. {
  49. try
  50. {
  51. // var json = JsonSerializer.Serialize(data);
  52. //var content = new StringContent(json, Encoding.UTF8, "application/json");
  53. var httpClient = _httpClientFactory.CreateClient();
  54. string url = $"{_connectionService.centerUrl}/common/exam/upsert-new-record";
  55. if (httpClient.DefaultRequestHeaders.Contains("X-Auth-AuthToken")) {
  56. httpClient.DefaultRequestHeaders.Remove("X-Auth-AuthToken");
  57. }
  58. httpClient.DefaultRequestHeaders.Add("X-Auth-AuthToken", _connectionService.loginToken);
  59. var response = await httpClient.PostAsJsonAsync(url, new {
  60. id =data.subjectResult.examId,
  61. answer=data.subjectResult.answers,
  62. subjectId=data.subjectResult.subjectId,
  63. classId=data.studentResult.classId,
  64. ownerId=data.studentResult.ownerId,
  65. paperId=data.subjectResult.paperId,
  66. studentId=data.studentResult.studentId,
  67. studentName=data.studentResult.studentName,
  68. }, cancellationToken);
  69. if (response.IsSuccessStatusCode)
  70. {
  71. MarkDataAsPushed(data);
  72. Console.WriteLine($"Data {data.resultId} pushed successfully.");
  73. }
  74. else
  75. {
  76. Console.WriteLine($"Failed to push data {data.resultId}. Retrying...");
  77. // 推送失败,重新加入队列
  78. await _dataQueue.Writer.WriteAsync(data, cancellationToken);
  79. }
  80. }
  81. catch (Exception ex)
  82. {
  83. Console.WriteLine($"Error pushing data {data.resultId}: {ex.Message}");
  84. // 推送失败,重新加入队列
  85. await _dataQueue.Writer.WriteAsync(data, cancellationToken);
  86. }
  87. }
  88. }
  89. private void MarkDataAsPushed((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data)
  90. {
  91. var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>();
  92. EvaluationSubjectResult? subjectResult = data.studentResult.subjectResults.Where(x => x.id!.Equals(data.resultId)).FirstOrDefault();
  93. if (subjectResult!=null)
  94. {
  95. subjectResult.pushed=1;
  96. collection.Upsert(data.studentResult);
  97. var collectionSub = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
  98. var unpushedData = collectionSub.FindOne(x => data.resultId.Equals(x.id));
  99. if (unpushedData!=null)
  100. {
  101. unpushedData.pushed=1;
  102. _dataQueue.MarkAsProcessed((unpushedData.id!, unpushedData.examId!, unpushedData.evaluationId!, unpushedData.subjectId!,unpushedData, data.studentResult));
  103. }
  104. }
  105. }
  106. private async Task LoadUnpushedDataAsync(CancellationToken cancellationToken)
  107. {
  108. var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
  109. var unpushedData = collection.Find(x => x.pushed!=1);
  110. foreach (var data in unpushedData)
  111. {
  112. var studentResults = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>().Find(x => x.subjectResults.Where(x => x.id!.Equals(data.id)).IsNotEmpty()).Distinct();
  113. // 将未推送的数据写入 Channel
  114. foreach (var studentResult in studentResults)
  115. {
  116. await _dataQueue.WriteAsync((studentResult!,data, data.id!), cancellationToken);
  117. }
  118. }
  119. }
  120. }
  121. public class DataQueue
  122. {
  123. private readonly Channel<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> _channel;
  124. private readonly HashSet<(string resultId,string examId,string evaluationId,string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)> _uniqueIds; // 用于维护唯一性
  125. public async Task<bool> TryAddAsync((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) data, CancellationToken cancellationToken = default)
  126. {
  127. var key = (data.resultId, data.examId,data.evaluationId,data.subjectId,data.subjectResult,data.studentResult); // 复合键
  128. lock (_uniqueIds)
  129. {
  130. if (_uniqueIds.Contains(key))
  131. {
  132. return false;
  133. }
  134. _uniqueIds.Add(key);
  135. }
  136. await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
  137. return true;
  138. }
  139. public DataQueue() {
  140. // 创建一个无界 Channel
  141. _channel = Channel.CreateUnbounded<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)>();
  142. _uniqueIds = new HashSet<(string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)>();
  143. }
  144. public ChannelWriter<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Writer => _channel.Writer;
  145. public ChannelReader<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Reader => _channel.Reader;
  146. // 批量写入数据
  147. public async Task WriteAsync(( EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data, CancellationToken cancellationToken = default)
  148. {
  149. await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
  150. }
  151. // 获取当前队列中的未上传数据数量
  152. public int GetPendingCount()
  153. {
  154. return _channel.Reader.Count;
  155. }
  156. // 从队列中移除数据时,同时从 HashSet 中移除 ID
  157. public void MarkAsProcessed((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) key)
  158. {
  159. lock (_uniqueIds) // 加锁确保线程安全
  160. {
  161. _uniqueIds.Remove(key);
  162. }
  163. }
  164. }
  165. }