using System; using System.Collections.Generic; using System.Net.Http; using System.Text.Json; using System.Threading.Tasks; using Azure.Cosmos; using Azure.Messaging.ServiceBus; using Microsoft.Azure.Documents; using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Logging; using TEAMModelOS.SDK.DI; using TEAMModelOS.SDK.Extension; using TEAMModelOS.SDK.Models; namespace TEAMModelFunction { public class MonitorCosmosDB { private readonly IHttpClientFactory _clientFactory; private readonly AzureCosmosFactory _azureCosmos; private readonly AzureServiceBusFactory _serviceBus; private readonly AzureStorageFactory _azureStorage; public MonitorCosmosDB(IHttpClientFactory clientFactory, AzureCosmosFactory azureCosmos,AzureServiceBusFactory azureServiceBus, AzureStorageFactory azureStorage) { _clientFactory = clientFactory; _azureCosmos = azureCosmos; _serviceBus = azureServiceBus; _azureStorage = azureStorage; } [FunctionName("Common")] public async Task Common([CosmosDBTrigger( databaseName: "TEAMModelOS", collectionName: "Common", ConnectionStringSetting = "Azure:Cosmos:ConnectionString", LeaseCollectionName = "leases")]IReadOnlyList inputs, ILogger log) { if (inputs != null && inputs.Count > 0) { log.LogInformation("Documents modified " + inputs.Count); log.LogInformation("First document Id " + inputs[0].Id); var client = _azureCosmos.GetCosmosClient(); foreach (var input in inputs) { string pk = input.GetPropertyValue("pk"); if (!string.IsNullOrWhiteSpace(pk)) { long stime = input.GetPropertyValue("startTime"); long etime = input.GetPropertyValue("endTime"); string school = input.GetPropertyValue("school"); string code = input.GetPropertyValue("code"); switch (pk) { case "Exam": ExamInfo info = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync(input.Id, new Azure.Cosmos.PartitionKey($"{code}")); List examClassResults = new List(); await foreach (var item in client.GetContainer("TEAMModelOS", "Common").GetItemQueryStreamIterator(queryText: $"select value(c) from c where c.examId = '{info.id}'", requestOptions: new QueryRequestOptions() { PartitionKey = new Azure.Cosmos.PartitionKey($"ExamClassResult-{school}") })) { using var json = await JsonDocument.ParseAsync(item.ContentStream); if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0) { foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray()) { examClassResults.Add(obj.ToObject()); } } } List records = await _azureStorage.FindListByDict(new Dictionary() { { "RowKey", input.Id }, { "PartitionKey", info.progress } }); //ChangeRecord record = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync(input.Id, new Azure.Cosmos.PartitionKey($"{info.progress}")); switch (info.progress) { case "pending": var message = new ServiceBusMessage(new { id = input.Id, progress = "going", code = code }.ToJsonString()); message.ApplicationProperties.Add("name", "Exam"); if (records.Count > 0) { await _serviceBus.GetServiceBusClient().cancelMessage("active-task", records[0].sequenceNumber); long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", message, DateTimeOffset.FromUnixTimeMilliseconds(stime)); records[0].sequenceNumber = start; await _azureStorage.SaveOrUpdate(records[0]); //await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(record, record.id, new Azure.Cosmos.PartitionKey($"{record.code}")); } else { long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", message, DateTimeOffset.FromUnixTimeMilliseconds(stime)); ChangeRecord changeRecord = new ChangeRecord { RowKey = input.Id, PartitionKey = "pending", sequenceNumber = start, msgId = message.MessageId }; await _azureStorage.Save(changeRecord); //await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}")); } break; case "going": if (examClassResults.Count == 0) { for (int j = 0; j < info.subjects.Count; j++) { for (int k = 0; k < info.targetClassIds.Count; k++) { ExamClassResult result = new ExamClassResult(); result.code = "ExamClassResult-" + info.school; result.examId = info.id; result.id = Guid.NewGuid().ToString(); result.subjectId = info.subjects[j].id; result.year = info.year; result.scope = info.scope; result.pk = typeof(ExamClassResult).Name; result.info.id = info.targetClassIds[k]; var sresponse = await client.GetContainer("TEAMModelOS", "School").ReadItemStreamAsync(info.targetClassIds[k], new Azure.Cosmos.PartitionKey($"Class-{info.school}")); if (sresponse.Status == 200) { using var json = await JsonDocument.ParseAsync(sresponse.ContentStream); Class classroom = json.ToObject(); result.info.name = classroom.name; List> ans = new List>(); List ansPoint = new List(); foreach (double p in info.papers[j].point) { ans.Add(new List()); ansPoint.Add(-1); } foreach (StudentSimple stu in classroom.students) { result.studentIds.Add(stu.id); result.studentAnswers.Add(ans); result.studentScores.Add(ansPoint); } } //result.progress = info.progress; result.school = info.school; await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(result, new Azure.Cosmos.PartitionKey($"{result.code}")); } } var messageEnd = new ServiceBusMessage(new { id = input.Id, progress = "finish", code = code }.ToJsonString()); messageEnd.ApplicationProperties.Add("name", "Exam"); if (records.Count > 0) { long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime)); await _serviceBus.GetServiceBusClient().cancelMessage("active-task", records[0].sequenceNumber); records[0].sequenceNumber = end; await _azureStorage.SaveOrUpdate(records[0]); //await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(record, record.id, new Azure.Cosmos.PartitionKey($"{record.code}")); } else { long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime)); ChangeRecord changeRecord = new ChangeRecord { RowKey = input.Id, PartitionKey = "going", sequenceNumber = end, msgId = messageEnd.MessageId }; await _azureStorage.Save(changeRecord); //await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}")); } } break; case "finish": for (int j = 0; j < info.subjects.Count; j++) { if (info.subjects[j].classCount == info.targetClassIds.Count) { ExamResult result = new ExamResult(); //人数总和 int Count = 0; int m = 0; List classRanges = new List(); foreach (ExamClassResult classResult in examClassResults) { if (classResult.subjectId.Equals(info.subjects[j].id)) { foreach (List scores in classResult.studentScores) { result.studentScores.Add(scores); } //处理班级信息 ClassRange range = new ClassRange(); range.id = classResult.info.id; range.name = classResult.info.name; List ran = new List(); int stuCount = classResult.studentIds.Count; Count += stuCount; if (m == 0) { ran.Add(0); ran.Add(stuCount - 1); } else { ran.Add(Count - stuCount); ran.Add(Count - 1); } m++; range.range = ran; classRanges.Add(range); //处理学生ID foreach (string id in classResult.studentIds) { result.studentIds.Add(id); } } } result.classes = classRanges; result.pk = typeof(ExamResult).Name; result.code = "ExamResult-" + info.id; result.school = info.school; result.id = info.subjects[j].id; result.examId = info.id; result.subjectId = info.subjects[j].id; result.year = info.year; result.paper = info.papers[j]; //result.point = info.papers[j].point; result.scope = info.scope; result.name = info.name; result.time = info.startTime; await _azureCosmos.GetCosmosClient().GetContainer("TEAMModelOS", "Common").UpsertItemAsync(result, new Azure.Cosmos.PartitionKey($"ExamResult-{info.id}")); } } break; } break; case "Vote": Vote vote = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync(input.Id, new Azure.Cosmos.PartitionKey($"{code}")); List voteRecords = await _azureStorage.FindListByDict(new Dictionary() { { "RowKey", input.Id }, { "PartitionKey", vote.progress } }); //ChangeRecord voteRecord = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync(input.Id, new Azure.Cosmos.PartitionKey($"{vote.progress}")); switch (vote.progress) { case "pending": var messageVote = new ServiceBusMessage(new { id = input.Id, progress = "going", code = code }.ToJsonString()); messageVote.ApplicationProperties.Add("name", "Vote"); if (voteRecords.Count > 0) { long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVote, DateTimeOffset.FromUnixTimeMilliseconds(stime)); await _serviceBus.GetServiceBusClient().cancelMessage("active-task", voteRecords[0].sequenceNumber); voteRecords[0].sequenceNumber = start; await _azureStorage.SaveOrUpdate(voteRecords[0]); //await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(voteRecord, voteRecord.id, new Azure.Cosmos.PartitionKey($"{voteRecord.code}")); } else { long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVote, DateTimeOffset.FromUnixTimeMilliseconds(stime)); ChangeRecord changeRecord = new ChangeRecord { RowKey = input.Id, PartitionKey = "pending", sequenceNumber = start, msgId = messageVote.MessageId }; await _azureStorage.Save(changeRecord); //await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}")); } break; case "going": var messageVoteEnd = new ServiceBusMessage(new { id = input.Id, progress = "finish", code = code }.ToJsonString()); messageVoteEnd.ApplicationProperties.Add("name", "Vote"); if (voteRecords.Count > 0) { long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVoteEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime)); await _serviceBus.GetServiceBusClient().cancelMessage("active-task", voteRecords[0].sequenceNumber); voteRecords[0].sequenceNumber = end; await _azureStorage.SaveOrUpdate(voteRecords[0]); //await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(voteRecord, voteRecord.id, new Azure.Cosmos.PartitionKey($"{voteRecord.code}")); } else { long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVoteEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime)); ChangeRecord changeRecord = new ChangeRecord { RowKey = input.Id, PartitionKey = "going", sequenceNumber = end, msgId = messageVoteEnd.MessageId }; await _azureStorage.Save(changeRecord); //await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}")); } break; } break; case "Survey": Survey survey = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync(input.Id, new Azure.Cosmos.PartitionKey($"{code}")); //messageSurvey.ScheduledEnqueueTime = DateTimeOffset.FromUnixTimeMilliseconds(stime); //string msgid = messageSurvey.MessageId; List changeRecords = await _azureStorage.FindListByDict(new Dictionary() { { "RowKey", input.Id }, { "PartitionKey", survey.progress } }); //ChangeRecord surveyRecord = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync(input.Id, new Azure.Cosmos.PartitionKey($"{survey.progress}")); switch (survey.progress) { case "pending": var messageSurvey = new ServiceBusMessage(new { id = input.Id, progress = "going", code = code }.ToJsonString()); messageSurvey.ApplicationProperties.Add("name", "Survey"); if (changeRecords.Count > 0) { await _serviceBus.GetServiceBusClient().cancelMessage("active-task", changeRecords[0].sequenceNumber); long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(stime)); changeRecords[0].sequenceNumber = start; await _azureStorage.SaveOrUpdate(changeRecords[0]); //await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(surveyRecord, surveyRecord.id, new Azure.Cosmos.PartitionKey($"{surveyRecord.code}")); } else { long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(stime)); ChangeRecord changeRecord = new ChangeRecord { RowKey = input.Id, PartitionKey = "pending", sequenceNumber = start, msgId = messageSurvey.MessageId }; await _azureStorage.Save(changeRecord); //await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}")); } break; case "going": var messageSurveyEnd = new ServiceBusMessage(new { id = input.Id, progress = "finish", code = code }.ToJsonString()); messageSurveyEnd.ApplicationProperties.Add("name", "Survey"); if (changeRecords.Count > 0) { long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurveyEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime)); await _serviceBus.GetServiceBusClient().cancelMessage("active-task", changeRecords[0].sequenceNumber); changeRecords[0].sequenceNumber = end; await _azureStorage.SaveOrUpdate(changeRecords[0]); } else { long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurveyEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime)); ChangeRecord changeRecord = new ChangeRecord { RowKey = input.Id, PartitionKey = "going", sequenceNumber = end, msgId = messageSurveyEnd.MessageId }; await _azureStorage.Save(changeRecord); //await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}")); } break; } break; } } } } } } }