|
@@ -0,0 +1,178 @@
|
|
|
|
+using IES.ExamServer.Helpers;
|
|
|
|
+using IES.ExamServer.Models;
|
|
|
|
+using LiteDB;
|
|
|
|
+using System;
|
|
|
|
+using System.Net.Http;
|
|
|
|
+using System.Text;
|
|
|
|
+using System.Threading.Channels;
|
|
|
|
+
|
|
|
|
+namespace IES.ExamServer.DI
|
|
|
|
+{
|
|
|
|
+ public class SubjectPushService: BackgroundService
|
|
|
|
+ {
|
|
|
|
+ private readonly DataQueue _dataQueue;
|
|
|
|
+ private readonly IHttpClientFactory _httpClientFactory;
|
|
|
|
+ private readonly LiteDBFactory _liteDBFactory;
|
|
|
|
+ private readonly CenterServiceConnectionService _connectionService;
|
|
|
|
+ private readonly IConfiguration _configuration;
|
|
|
|
+ public SubjectPushService(DataQueue dataQueue, IHttpClientFactory httpClientFactory, LiteDBFactory liteDBFactory, CenterServiceConnectionService connectionService, IConfiguration configuration)
|
|
|
|
+ {
|
|
|
|
+
|
|
|
|
+ _dataQueue = dataQueue;
|
|
|
|
+ _httpClientFactory = httpClientFactory;
|
|
|
|
+ _liteDBFactory = liteDBFactory;
|
|
|
|
+ _connectionService = connectionService;
|
|
|
|
+ _configuration= configuration;
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
|
|
+ {
|
|
|
|
+ ///数据中心链接了,开始推送数据
|
|
|
|
+ if (_connectionService.centerIsConnected)
|
|
|
|
+ {
|
|
|
|
+ // 启动时加载未推送的数据
|
|
|
|
+ //await LoadUnpushedDataAsync(stoppingToken);
|
|
|
|
+ }
|
|
|
|
+ while (!stoppingToken.IsCancellationRequested)
|
|
|
|
+ {
|
|
|
|
+ // 从 Channel 中读取数据
|
|
|
|
+ if (await _dataQueue.Reader.WaitToReadAsync(stoppingToken))
|
|
|
|
+ {
|
|
|
|
+ while (_dataQueue.Reader.TryRead(out var data))
|
|
|
|
+ {
|
|
|
|
+
|
|
|
|
+ await PushSubjectResultDataToCloudAsync(data, stoppingToken);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private async Task PushSubjectResultDataToCloudAsync((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)data, CancellationToken cancellationToken)
|
|
|
|
+ {
|
|
|
|
+ if (_connectionService.centerIsConnected)
|
|
|
|
+ {
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ // var json = JsonSerializer.Serialize(data);
|
|
|
|
+ //var content = new StringContent(json, Encoding.UTF8, "application/json");
|
|
|
|
+ var httpClient = _httpClientFactory.CreateClient();
|
|
|
|
+
|
|
|
|
+ string url = $"{_connectionService.centerUrl}/common/exam/upsert-new-record";
|
|
|
|
+ if (httpClient.DefaultRequestHeaders.Contains("X-Auth-AuthToken")) {
|
|
|
|
+ httpClient.DefaultRequestHeaders.Remove("X-Auth-AuthToken");
|
|
|
|
+ }
|
|
|
|
+ httpClient.DefaultRequestHeaders.Add("X-Auth-AuthToken", _connectionService.loginToken);
|
|
|
|
+ var response = await httpClient.PostAsJsonAsync(url, new {
|
|
|
|
+ id =data.subjectResult.examId,
|
|
|
|
+ answer=data.subjectResult.answers,
|
|
|
|
+ subjectId=data.subjectResult.subjectId,
|
|
|
|
+ classId=data.studentResult.classId,
|
|
|
|
+ ownerId=data.studentResult.ownerId,
|
|
|
|
+ paperId=data.subjectResult.paperId,
|
|
|
|
+ studentId=data.studentResult.studentId,
|
|
|
|
+ studentName=data.studentResult.studentName,
|
|
|
|
+ }, cancellationToken);
|
|
|
|
+
|
|
|
|
+ if (response.IsSuccessStatusCode)
|
|
|
|
+ {
|
|
|
|
+ MarkDataAsPushed(data);
|
|
|
|
+ Console.WriteLine($"Data {data.resultId} pushed successfully.");
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ Console.WriteLine($"Failed to push data {data.resultId}. Retrying...");
|
|
|
|
+ // 推送失败,重新加入队列
|
|
|
|
+ await _dataQueue.Writer.WriteAsync(data, cancellationToken);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ catch (Exception ex)
|
|
|
|
+ {
|
|
|
|
+ Console.WriteLine($"Error pushing data {data.resultId}: {ex.Message}");
|
|
|
|
+ // 推送失败,重新加入队列
|
|
|
|
+ await _dataQueue.Writer.WriteAsync(data, cancellationToken);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ private void MarkDataAsPushed((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data)
|
|
|
|
+ {
|
|
|
|
+ var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>();
|
|
|
|
+ EvaluationSubjectResult? subjectResult = data.studentResult.subjectResults.Where(x => x.id!.Equals(data.resultId)).FirstOrDefault();
|
|
|
|
+ if (subjectResult!=null)
|
|
|
|
+ {
|
|
|
|
+ subjectResult.pushed=1;
|
|
|
|
+ collection.Upsert(data.studentResult);
|
|
|
|
+ var collectionSub = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
|
|
|
|
+ var unpushedData = collectionSub.FindOne(x => data.resultId.Equals(x.id));
|
|
|
|
+ if (unpushedData!=null)
|
|
|
|
+ {
|
|
|
|
+ unpushedData.pushed=1;
|
|
|
|
+ _dataQueue.MarkAsProcessed((unpushedData.id!, unpushedData.examId!, unpushedData.evaluationId!, unpushedData.subjectId!,unpushedData, data.studentResult));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ private async Task LoadUnpushedDataAsync(CancellationToken cancellationToken)
|
|
|
|
+ {
|
|
|
|
+ var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
|
|
|
|
+ var unpushedData = collection.Find(x => x.pushed!=1);
|
|
|
|
+ foreach (var data in unpushedData)
|
|
|
|
+ {
|
|
|
|
+ var studentResults = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>().Find(x => x.subjectResults.Where(x => x.id!.Equals(data.id)).IsNotEmpty()).Distinct();
|
|
|
|
+ // 将未推送的数据写入 Channel
|
|
|
|
+ foreach (var studentResult in studentResults)
|
|
|
|
+ {
|
|
|
|
+ await _dataQueue.WriteAsync((studentResult!,data, data.id!), cancellationToken);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public class DataQueue
|
|
|
|
+ {
|
|
|
|
+ private readonly Channel<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> _channel;
|
|
|
|
+ private readonly HashSet<(string resultId,string examId,string evaluationId,string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)> _uniqueIds; // 用于维护唯一性
|
|
|
|
+
|
|
|
|
+ public async Task<bool> TryAddAsync((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) data, CancellationToken cancellationToken = default)
|
|
|
|
+ {
|
|
|
|
+ var key = (data.resultId, data.examId,data.evaluationId,data.subjectId,data.subjectResult,data.studentResult); // 复合键
|
|
|
|
+ lock (_uniqueIds)
|
|
|
|
+ {
|
|
|
|
+ if (_uniqueIds.Contains(key))
|
|
|
|
+ {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ _uniqueIds.Add(key);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ public DataQueue() {
|
|
|
|
+ // 创建一个无界 Channel
|
|
|
|
+ _channel = Channel.CreateUnbounded<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)>();
|
|
|
|
+ _uniqueIds = new HashSet<(string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)>();
|
|
|
|
+ }
|
|
|
|
+ public ChannelWriter<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Writer => _channel.Writer;
|
|
|
|
+ public ChannelReader<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Reader => _channel.Reader;
|
|
|
|
+
|
|
|
|
+ // 批量写入数据
|
|
|
|
+ public async Task WriteAsync(( EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data, CancellationToken cancellationToken = default)
|
|
|
|
+ {
|
|
|
|
+ await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
|
|
|
|
+ }
|
|
|
|
+ // 获取当前队列中的未上传数据数量
|
|
|
|
+ public int GetPendingCount()
|
|
|
|
+ {
|
|
|
|
+ return _channel.Reader.Count;
|
|
|
|
+ }
|
|
|
|
+ // 从队列中移除数据时,同时从 HashSet 中移除 ID
|
|
|
|
+ public void MarkAsProcessed((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) key)
|
|
|
|
+ {
|
|
|
|
+ lock (_uniqueIds) // 加锁确保线程安全
|
|
|
|
+ {
|
|
|
|
+ _uniqueIds.Remove(key);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|