|
@@ -48,7 +48,7 @@ namespace IES.ExamServer.DI
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private async Task PushSubjectResultDataToCloudAsync((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)data, CancellationToken cancellationToken)
|
|
|
+ private async Task PushSubjectResultDataToCloudAsync(SubjectPushData data, CancellationToken cancellationToken)
|
|
|
{
|
|
|
if (_connectionService.centerIsConnected)
|
|
|
{
|
|
@@ -64,102 +64,101 @@ namespace IES.ExamServer.DI
|
|
|
}
|
|
|
httpClient.DefaultRequestHeaders.Add("X-Auth-AuthToken", _connectionService.loginToken);
|
|
|
var response = await httpClient.PostAsJsonAsync(url, new {
|
|
|
- id =data.subjectResult.examId,
|
|
|
- answer=data.subjectResult.answers,
|
|
|
- subjectId=data.subjectResult.subjectId,
|
|
|
- classId=data.studentResult.classId,
|
|
|
- ownerId=data.studentResult.ownerId,
|
|
|
- paperId=data.subjectResult.paperId,
|
|
|
- studentId=data.studentResult.studentId,
|
|
|
- studentName=data.studentResult.studentName,
|
|
|
+ id =data.examId,
|
|
|
+ answer=data.answers,
|
|
|
+ subjectId = data.subjectId,
|
|
|
+ classId = data.classId,
|
|
|
+ ownerId = data.ownerId,
|
|
|
+ paperId = data.paperId,
|
|
|
+ studentId = data.studentId,
|
|
|
+ studentName = data.studentName,
|
|
|
}, cancellationToken);
|
|
|
|
|
|
if (response.IsSuccessStatusCode)
|
|
|
{
|
|
|
MarkDataAsPushed(data);
|
|
|
- Console.WriteLine($"Data {data.resultId} pushed successfully.");
|
|
|
+ Console.WriteLine($"Data {data.studentResultId}{data.subjectResultId} pushed successfully.");
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- Console.WriteLine($"Failed to push data {data.resultId}. Retrying...");
|
|
|
+ Console.WriteLine($"Failed to push data {data.studentResultId}{data.subjectResultId} Retrying...");
|
|
|
// 推送失败,重新加入队列
|
|
|
await _dataQueue.Writer.WriteAsync(data, cancellationToken);
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- Console.WriteLine($"Error pushing data {data.resultId}: {ex.Message}");
|
|
|
+ Console.WriteLine($"Error pushing data {data.studentResultId}{data.subjectResultId} : {ex.Message}");
|
|
|
// 推送失败,重新加入队列
|
|
|
await _dataQueue.Writer.WriteAsync(data, cancellationToken);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- private void MarkDataAsPushed((EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data)
|
|
|
+ private void MarkDataAsPushed(SubjectPushData data )
|
|
|
{
|
|
|
- var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>();
|
|
|
- EvaluationSubjectResult? subjectResult = data.studentResult.subjectResults.Where(x => x.id!.Equals(data.resultId)).FirstOrDefault();
|
|
|
+ EvaluationStudentResult studentResult = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>().FindOne(x=>x.id!.Equals(data.studentResultId));
|
|
|
+ EvaluationSubjectResult subjectResult = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>().FindOne(x => x.id!.Equals(data.subjectResultId));
|
|
|
+ EvaluationSubjectResult? subjectResultInStundent = studentResult.subjectResults.Where(x => x.id!.Equals(data.subjectResultId)).FirstOrDefault();
|
|
|
+ if (subjectResultInStundent!=null)
|
|
|
+ {
|
|
|
+ subjectResultInStundent.pushed=1;
|
|
|
+ _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>().Upsert(studentResult);
|
|
|
+ }
|
|
|
if (subjectResult!=null)
|
|
|
{
|
|
|
subjectResult.pushed=1;
|
|
|
- collection.Upsert(data.studentResult);
|
|
|
- var collectionSub = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
|
|
|
- var unpushedData = collectionSub.FindOne(x => data.resultId.Equals(x.id));
|
|
|
- if (unpushedData!=null)
|
|
|
- {
|
|
|
- unpushedData.pushed=1;
|
|
|
- _dataQueue.MarkAsProcessed((unpushedData.id!, unpushedData.examId!, unpushedData.evaluationId!, unpushedData.subjectId!,unpushedData, data.studentResult));
|
|
|
- }
|
|
|
+ _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>().Upsert(subjectResult);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
- private async Task LoadUnpushedDataAsync(CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
|
|
|
- var unpushedData = collection.Find(x => x.pushed!=1);
|
|
|
- foreach (var data in unpushedData)
|
|
|
- {
|
|
|
- var studentResults = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>().Find(x => x.subjectResults.Where(x => x.id!.Equals(data.id)).IsNotEmpty()).Distinct();
|
|
|
- // 将未推送的数据写入 Channel
|
|
|
- foreach (var studentResult in studentResults)
|
|
|
- {
|
|
|
- await _dataQueue.WriteAsync((studentResult!,data, data.id!), cancellationToken);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ //private async Task LoadUnpushedDataAsync(CancellationToken cancellationToken)
|
|
|
+ //{
|
|
|
+ // var collection = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationSubjectResult>();
|
|
|
+ // var unpushedData = collection.Find(x => x.pushed!=1);
|
|
|
+ // foreach (var data in unpushedData)
|
|
|
+ // {
|
|
|
+ // var studentResults = _liteDBFactory.GetLiteDatabase().GetCollection<EvaluationStudentResult>().Find(x => x.subjectResults.Where(x => x.id!.Equals(data.id)).IsNotEmpty()).Distinct();
|
|
|
+ // // 将未推送的数据写入 Channel
|
|
|
+ // foreach (var studentResult in studentResults)
|
|
|
+ // {
|
|
|
+ // await _dataQueue.WriteAsync((studentResult!,data, data.id!), cancellationToken);
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ //}
|
|
|
}
|
|
|
|
|
|
public class DataQueue
|
|
|
{
|
|
|
- private readonly Channel<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> _channel;
|
|
|
- private readonly HashSet<(string resultId,string examId,string evaluationId,string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)> _uniqueIds; // 用于维护唯一性
|
|
|
+ private readonly Channel<SubjectPushData> _channel;
|
|
|
+ // private readonly HashSet<(string resultId, SubjectPushData pushData)> _uniqueIds; // 用于维护唯一性
|
|
|
|
|
|
- public async Task<bool> TryAddAsync((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) data, CancellationToken cancellationToken = default)
|
|
|
+ public async Task<bool> TryAddAsync(SubjectPushData pushData, CancellationToken cancellationToken = default)
|
|
|
{
|
|
|
- var key = (data.resultId, data.examId,data.evaluationId,data.subjectId,data.subjectResult,data.studentResult); // 复合键
|
|
|
- lock (_uniqueIds)
|
|
|
- {
|
|
|
- if (_uniqueIds.Contains(key))
|
|
|
- {
|
|
|
- return false;
|
|
|
- }
|
|
|
- _uniqueIds.Add(key);
|
|
|
- }
|
|
|
+ //var key = (data.resultId,data.pushData); // 复合键
|
|
|
+ //lock (_uniqueIds)
|
|
|
+ //{
|
|
|
+ // if (_uniqueIds.Contains(key))
|
|
|
+ // {
|
|
|
+ // return false;
|
|
|
+ // }
|
|
|
+ // _uniqueIds.Add(key);
|
|
|
+ //}
|
|
|
|
|
|
- await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
|
|
|
+ await _channel.Writer.WriteAsync(pushData , cancellationToken);
|
|
|
return true;
|
|
|
}
|
|
|
public DataQueue() {
|
|
|
// 创建一个无界 Channel
|
|
|
- _channel = Channel.CreateUnbounded<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)>();
|
|
|
- _uniqueIds = new HashSet<(string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)>();
|
|
|
+ _channel = Channel.CreateUnbounded<SubjectPushData>();
|
|
|
+ //_uniqueIds = new HashSet<(string resultId, SubjectPushData pushData)>();
|
|
|
}
|
|
|
- public ChannelWriter<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Writer => _channel.Writer;
|
|
|
- public ChannelReader<(EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId)> Reader => _channel.Reader;
|
|
|
+ public ChannelWriter<SubjectPushData> Writer => _channel.Writer;
|
|
|
+ public ChannelReader<SubjectPushData> Reader => _channel.Reader;
|
|
|
|
|
|
// 批量写入数据
|
|
|
- public async Task WriteAsync(( EvaluationStudentResult studentResult, EvaluationSubjectResult subjectResult, string resultId) data, CancellationToken cancellationToken = default)
|
|
|
+ public async Task WriteAsync(SubjectPushData data, CancellationToken cancellationToken = default)
|
|
|
{
|
|
|
- await _channel.Writer.WriteAsync((data.studentResult,data.subjectResult, data.resultId), cancellationToken);
|
|
|
+ await _channel.Writer.WriteAsync(data, cancellationToken);
|
|
|
}
|
|
|
// 获取当前队列中的未上传数据数量
|
|
|
public int GetPendingCount()
|
|
@@ -167,12 +166,71 @@ namespace IES.ExamServer.DI
|
|
|
return _channel.Reader.Count;
|
|
|
}
|
|
|
// 从队列中移除数据时,同时从 HashSet 中移除 ID
|
|
|
- public void MarkAsProcessed((string resultId, string examId, string evaluationId, string subjectId, EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult) key)
|
|
|
+ //public void MarkAsProcessed((string resultId, SubjectPushData pushData) key)
|
|
|
+ //{
|
|
|
+ // lock (_uniqueIds) // 加锁确保线程安全
|
|
|
+ // {
|
|
|
+ // _uniqueIds.Remove(key);
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 用于推送数据的类
|
|
|
+ /// </summary>
|
|
|
+ public class SubjectPushData: EvaluationSubjectResult
|
|
|
+ {
|
|
|
+ public SubjectPushData()
|
|
|
+ { }
|
|
|
+ public SubjectPushData(EvaluationSubjectResult subjectResult, EvaluationStudentResult studentResult)
|
|
|
{
|
|
|
- lock (_uniqueIds) // 加锁确保线程安全
|
|
|
- {
|
|
|
- _uniqueIds.Remove(key);
|
|
|
- }
|
|
|
+
|
|
|
+ this.id = subjectResult.id;
|
|
|
+ this.subjectResultId = subjectResult.id;
|
|
|
+ this.examId = subjectResult.examId;
|
|
|
+ this.examName = subjectResult.examName;
|
|
|
+ this.subjectName = subjectResult.subjectName;
|
|
|
+ this.paperName = subjectResult.paperName;
|
|
|
+ this.evaluationId = subjectResult.evaluationId;
|
|
|
+ this.subjectId = subjectResult.subjectId;
|
|
|
+ this.paperId = subjectResult.paperId;
|
|
|
+ this.answers = subjectResult.answers;
|
|
|
+ this.pushed = subjectResult.pushed;
|
|
|
+ this.answers = subjectResult.answers;
|
|
|
+ this.questionCount = subjectResult.questionCount;
|
|
|
+ this.costTime= subjectResult.costTime;
|
|
|
+ this.evaluationId= subjectResult.evaluationId;
|
|
|
+ this.submitTime = subjectResult.submitTime;
|
|
|
+ this.finished = subjectResult.finished;
|
|
|
+ this.studentName = studentResult.studentName;
|
|
|
+ this.studentResultId = studentResult.id;
|
|
|
+ this.studentId = studentResult.studentId;
|
|
|
+ this.schoolId = studentResult.schoolId;
|
|
|
+ this.pid = studentResult.pid;
|
|
|
+ this.classId = studentResult.classId;
|
|
|
+ this.ownerId = studentResult.ownerId;
|
|
|
+ this.scope = studentResult.scope;
|
|
|
+ this.type = studentResult.type;
|
|
|
}
|
|
|
+
|
|
|
+ public string? studentResultId { get; set; }
|
|
|
+ public string? subjectResultId { get; set; }
|
|
|
+ public string? studentId { get; set; }
|
|
|
+ public string? studentName { get; set; }
|
|
|
+ public string? schoolId { get; set; }
|
|
|
+ public string? pid { get; set; }
|
|
|
+ public string? classId { get; set; }
|
|
|
+ public string? ownerId { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 数据范围
|
|
|
+ /// </summary>
|
|
|
+ public string? scope { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 类型: Exam 普通评测, Art艺术评测
|
|
|
+ /// </summary>
|
|
|
+ public string? type { get; set; }
|
|
|
+ /// <summary>
|
|
|
+ /// 推送次数
|
|
|
+ /// </summary>
|
|
|
+ public int pushedCount { get; set; }
|
|
|
}
|
|
|
}
|