123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- using IES.ExamServer.Helpers;
- using IES.ExamServer.Models;
- using LiteDB;
- using System;
- using System.Net.Http;
- using System.Text;
- using System.Threading.Channels;
- namespace IES.ExamServer.DI
- {
- public class SubjectPushService: BackgroundService
- {
- private readonly DataQueue _dataQueue;
- private readonly IHttpClientFactory _httpClientFactory;
- private readonly LiteDBFactory _liteDBFactory;
- private readonly CenterServiceConnectionService _connectionService;
- private readonly IConfiguration _configuration;
- public SubjectPushService(DataQueue dataQueue, IHttpClientFactory httpClientFactory, LiteDBFactory liteDBFactory, CenterServiceConnectionService connectionService, IConfiguration configuration)
- {
-
- _dataQueue = dataQueue;
- _httpClientFactory = httpClientFactory;
- _liteDBFactory = liteDBFactory;
- _connectionService = connectionService;
- _configuration= configuration;
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- ///数据中心链接了,开始推送数据
- if (_connectionService.centerIsConnected)
- {
- // 启动时加载未推送的数据
- //await LoadUnpushedDataAsync(stoppingToken);
- }
- while (!stoppingToken.IsCancellationRequested)
- {
- // 从 Channel 中读取数据
- if (await _dataQueue.Reader.WaitToReadAsync(stoppingToken))
- {
- while (_dataQueue.Reader.TryRead(out var data))
- {
- await PushSubjectResultDataToCloudAsync(data, stoppingToken);
-
- }
- }
- }
- }
-
- private async Task PushSubjectResultDataToCloudAsync((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)data, CancellationToken cancellationToken)
- {
- if (_connectionService.centerIsConnected)
- {
- try
- {
- // var json = JsonSerializer.Serialize(data);
- //var content = new StringContent(json, Encoding.UTF8, "application/json");
- var httpClient = _httpClientFactory.CreateClient();
- string url = $"{_connectionService.centerUrl}/common/exam/upsert-new-record";
- if (httpClient.DefaultRequestHeaders.Contains("X-Auth-AuthToken")) {
- httpClient.DefaultRequestHeaders.Remove("X-Auth-AuthToken");
- }
- httpClient.DefaultRequestHeaders.Add("X-Auth-AuthToken", _connectionService.loginToken);
- var response = await httpClient.PostAsJsonAsync(url, new {
- id =data.subjectResult.examId,
- answer=data.subjectResult.answers,
- subjectId=data.subjectResult.subjectId,
- classId=data.studentResult.classId,
- ownerId=data.studentResult.ownerId,
- paperId=data.subjectResult.paperId,
- studentId=data.studentResult.studentId,
- studentName=data.studentResult.studentName,
- }, cancellationToken);
- if (response.IsSuccessStatusCode)
- {
- MarkDataAsPushed(data);
- Console.WriteLine($"Data {data.resultId} pushed successfully.");
- }
- else
- {
- Console.WriteLine($"Failed to push data {data.resultId}. Retrying...");
- // 推送失败,重新加入队列
- await _dataQueue.Writer.WriteAsync(data, cancellationToken);
- }
- }
- catch (Exception ex)
- {
- Console.WriteLine($"Error pushing data {data.resultId}: {ex.Message}");
- // 推送失败,重新加入队列
- await _dataQueue.Writer.WriteAsync(data, cancellationToken);
- }
- }
- }
- private void MarkDataAsPushed((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data)
- {
- var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>();
- EvaluationSubjectResult? subjectResult = data.studentResult.subjectResults.Where(x => x.id!.Equals(data.resultId)).FirstOrDefault();
- if (subjectResult!=null)
- {
- subjectResult.pushed=1;
- collection.Upsert(data.studentResult);
- var collectionSub = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
- var unpushedData = collectionSub.FindOne(x => data.resultId.Equals(x.id));
- if (unpushedData!=null)
- {
- unpushedData.pushed=1;
- _dataQueue.MarkAsProcessed((unpushedData.id!, unpushedData.examId!, unpushedData.evaluationId!, unpushedData.subjectId!,unpushedData, data.studentResult));
- }
- }
-
- }
- private async Task LoadUnpushedDataAsync(CancellationToken cancellationToken)
- {
- var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
- var unpushedData = collection.Find(x => x.pushed!=1);
- foreach (var data in unpushedData)
- {
- var studentResults = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>().Find(x => x.subjectResults.Where(x => x.id!.Equals(data.id)).IsNotEmpty()).Distinct();
- // 将未推送的数据写入 Channel
- foreach (var studentResult in studentResults)
- {
- await _dataQueue.WriteAsync((studentResult!,data, data.id!), cancellationToken);
- }
- }
- }
- }
- public class DataQueue
- {
- private readonly Channel<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> _channel;
- private readonly HashSet<(string resultId,string examId,string evaluationId,string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)> _uniqueIds; // 用于维护唯一性
- public async Task<bool> TryAddAsync((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) data, CancellationToken cancellationToken = default)
- {
- var key = (data.resultId, data.examId,data.evaluationId,data.subjectId,data.subjectResult,data.studentResult); // 复合键
- lock (_uniqueIds)
- {
- if (_uniqueIds.Contains(key))
- {
- return false;
- }
- _uniqueIds.Add(key);
- }
- await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
- return true;
- }
- public DataQueue() {
- // 创建一个无界 Channel
- _channel = Channel.CreateUnbounded<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)>();
- _uniqueIds = new HashSet<(string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)>();
- }
- public ChannelWriter<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Writer => _channel.Writer;
- public ChannelReader<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Reader => _channel.Reader;
- // 批量写入数据
- public async Task WriteAsync(( EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data, CancellationToken cancellationToken = default)
- {
- await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
- }
- // 获取当前队列中的未上传数据数量
- public int GetPendingCount()
- {
- return _channel.Reader.Count;
- }
- // 从队列中移除数据时,同时从 HashSet 中移除 ID
- public void MarkAsProcessed((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) key)
- {
- lock (_uniqueIds) // 加锁确保线程安全
- {
- _uniqueIds.Remove(key);
- }
- }
- }
- }
|