MonitorCosmosDB.cs 21 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net.Http;
  4. using System.Text.Json;
  5. using System.Threading.Tasks;
  6. using Azure.Cosmos;
  7. using Azure.Messaging.ServiceBus;
  8. using Microsoft.Azure.Documents;
  9. using Microsoft.Azure.WebJobs;
  10. using Microsoft.Extensions.Logging;
  11. using TEAMModelFunction.model;
  12. using TEAMModelOS.SDK.DI;
  13. using TEAMModelOS.SDK.Extension;
  14. namespace TEAMModelFunction
  15. {
  16. public class MonitorCosmosDB
  17. {
  18. private readonly IHttpClientFactory _clientFactory;
  19. private readonly AzureCosmosFactory _azureCosmos;
  20. private readonly AzureServiceBusFactory _serviceBus;
  21. public MonitorCosmosDB(IHttpClientFactory clientFactory, AzureCosmosFactory azureCosmos,AzureServiceBusFactory azureServiceBus)
  22. {
  23. _clientFactory = clientFactory;
  24. _azureCosmos = azureCosmos;
  25. _serviceBus = azureServiceBus;
  26. }
  27. [FunctionName("ActiveChange")]
  28. public async Task School([CosmosDBTrigger(
  29. databaseName: "TEAMModelOS",
  30. collectionName: "Common",
  31. ConnectionStringSetting = "AzureServiceCosmosConnectionString",
  32. LeaseCollectionName = "leases")]IReadOnlyList<Document> input, ILogger log)
  33. {
  34. if (input != null && input.Count > 0)
  35. {
  36. log.LogInformation("Documents modified " + input.Count);
  37. log.LogInformation("First document Id " + input[0].Id);
  38. }
  39. string pk = input[0].GetPropertyValue<string>("pk");
  40. if (!string.IsNullOrEmpty(pk))
  41. {
  42. var client = _azureCosmos.GetCosmosClient();
  43. long stime = input[0].GetPropertyValue<long>("startTime");
  44. long etime = input[0].GetPropertyValue<long>("endTime");
  45. string school = input[0].GetPropertyValue<string>("school");
  46. string code = input[0].GetPropertyValue<string>("code");
  47. switch (pk) {
  48. case "Exam":
  49. ExamInfo info = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync<ExamInfo>(input[0].Id,new Azure.Cosmos.PartitionKey($"{code}"));
  50. List<ExamClassResult> examClassResults = new List<ExamClassResult>();
  51. await foreach (var item in client.GetContainer("TEAMModelOS", "Common").GetItemQueryStreamIterator(queryText: $"select value(c) from c where c.examId = '{info.id}'", requestOptions: new QueryRequestOptions() { PartitionKey = new Azure.Cosmos.PartitionKey($"ExamClassResult-{school}") }))
  52. {
  53. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  54. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  55. {
  56. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  57. {
  58. examClassResults.Add(obj.ToObject<ExamClassResult>());
  59. }
  60. }
  61. }
  62. var message = new ServiceBusMessage(new { id = input[0].Id, name = "Exam", code = code }.ToJsonString());
  63. message.Properties.Add("name", "Exam");
  64. ChangeRecord record = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync<ChangeRecord>(input[0].Id, new Azure.Cosmos.PartitionKey($"{info.progress}"));
  65. switch (info.progress) {
  66. case "pending":
  67. if (record != null)
  68. {
  69. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", message, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  70. record.sequenceNumber = start;
  71. await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(record, record.id, new Azure.Cosmos.PartitionKey($"{record.code}"));
  72. }
  73. else {
  74. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", message, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  75. ChangeRecord changeRecord = new ChangeRecord
  76. {
  77. id = input[0].Id,
  78. code = "pending",
  79. pk = "ChangeRecord",
  80. sequenceNumber = start
  81. };
  82. await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}"));
  83. }
  84. break;
  85. case "going":
  86. if (examClassResults.Count < 0) {
  87. for (int j = 0; j < info.subjects.Count; j++)
  88. {
  89. for (int k = 0; k < info.targetClassIds.Count; k++)
  90. {
  91. ExamClassResult result = new ExamClassResult();
  92. result.code = "ExamClassResult-" + info.school;
  93. result.examId = info.id;
  94. result.id = Guid.NewGuid().ToString();
  95. result.subjectId = info.subjects[j].id;
  96. result.year = info.year;
  97. result.ttl = -1;
  98. result.scope = info.scope;
  99. result.pk = typeof(ExamClassResult).Name;
  100. result.info.id = info.targetClassIds[k];
  101. var sresponse = await client.GetContainer("TEAMModelOS", "School").ReadItemStreamAsync(info.targetClassIds[k], new Azure.Cosmos.PartitionKey($"Class-{info.school}"));
  102. if (sresponse.Status == 200)
  103. {
  104. using var json = await JsonDocument.ParseAsync(sresponse.ContentStream);
  105. Classroom classroom = json.ToObject<Classroom>();
  106. result.info.name = classroom.name;
  107. List<List<string>> ans = new List<List<string>>();
  108. List<double> ansPoint = new List<double>();
  109. foreach (double p in info.papers[j].point)
  110. {
  111. ans.Add(new List<string>());
  112. ansPoint.Add(-1);
  113. }
  114. foreach (StudentSimple stu in classroom.students)
  115. {
  116. result.studentIds.Add(stu.id);
  117. result.studentAnswers.Add(ans);
  118. result.studentScores.Add(ansPoint);
  119. }
  120. }
  121. result.progress = info.progress;
  122. result.school = info.school;
  123. await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(result, new Azure.Cosmos.PartitionKey($"{result.code}"));
  124. }
  125. }
  126. if (record != null)
  127. {
  128. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", message, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  129. record.sequenceNumber = start;
  130. await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(record, record.id, new Azure.Cosmos.PartitionKey($"{record.code}"));
  131. }
  132. else
  133. {
  134. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", message, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  135. ChangeRecord changeRecord = new ChangeRecord
  136. {
  137. id = input[0].Id,
  138. code = "going",
  139. pk = "ChangeRecord",
  140. sequenceNumber = start
  141. };
  142. await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}"));
  143. }
  144. }
  145. break;
  146. case "finish":
  147. for (int j = 0; j < info.subjects.Count; j++)
  148. {
  149. ExamResult result = new ExamResult();
  150. result.ttl = -1;
  151. result.pk = typeof(ExamResult).Name;
  152. result.code = "ExamResult-" + info.school;
  153. result.school = info.school;
  154. result.id = Guid.NewGuid().ToString();
  155. result.examId = info.id;
  156. result.subjectId = info.subjects[j].id;
  157. result.year = info.year;
  158. result.paper = info.papers[j];
  159. result.point = info.papers[j].point;
  160. result.scope = info.scope;
  161. result.name = info.name;
  162. //result.time
  163. //人数总和
  164. int Count = 0;
  165. int m = 0;
  166. List<ClassRange> classRanges = new List<ClassRange>();
  167. foreach (ExamClassResult classResult in examClassResults)
  168. {
  169. //处理班级信息
  170. ClassRange range = new ClassRange();
  171. range.id = classResult.info.id;
  172. range.name = classResult.info.name;
  173. List<int> ran = new List<int>();
  174. int stuCount = classResult.studentIds.Count;
  175. Count += stuCount;
  176. if (m == 0)
  177. {
  178. ran.Add(0);
  179. ran.Add(stuCount - 1);
  180. }
  181. else
  182. {
  183. ran.Add(Count - stuCount);
  184. ran.Add(Count - 1);
  185. }
  186. m++;
  187. range.range = ran;
  188. classRanges.Add(range);
  189. //处理学生ID
  190. foreach (string id in classResult.studentIds)
  191. {
  192. result.studentIds.Add(id);
  193. }
  194. foreach (List<double> scores in classResult.studentScores)
  195. {
  196. result.studentScores.Add(scores);
  197. }
  198. }
  199. result.classes = classRanges;
  200. await _azureCosmos.GetCosmosClient().GetContainer("TEAMModelOS", "Common").CreateItemAsync(result, new Azure.Cosmos.PartitionKey($"ExamResult-{result.school}"));
  201. }
  202. break;
  203. }
  204. break;
  205. case "Vote":
  206. Vote vote = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync<Vote>(input[0].Id, new Azure.Cosmos.PartitionKey($"{code}"));
  207. var messageVote = new ServiceBusMessage(new { id = input[0].Id, name = "Vote", code = code }.ToJsonString());
  208. messageVote.Properties.Add("name", "Vote");
  209. ChangeRecord voteRecord = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync<ChangeRecord>(input[0].Id, new Azure.Cosmos.PartitionKey($"{vote.progress}"));
  210. switch (vote.progress) {
  211. case "pending":
  212. if (voteRecord != null)
  213. {
  214. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVote, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  215. voteRecord.sequenceNumber = start;
  216. await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(voteRecord, voteRecord.id, new Azure.Cosmos.PartitionKey($"{voteRecord.code}"));
  217. }
  218. else
  219. {
  220. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVote, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  221. ChangeRecord changeRecord = new ChangeRecord
  222. {
  223. id = input[0].Id,
  224. code = "pending",
  225. pk = "ChangeRecord",
  226. sequenceNumber = start
  227. };
  228. await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}"));
  229. }
  230. break;
  231. case "going":
  232. if (voteRecord != null)
  233. {
  234. long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVote, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  235. voteRecord.sequenceNumber = end;
  236. await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(voteRecord, voteRecord.id, new Azure.Cosmos.PartitionKey($"{voteRecord.code}"));
  237. }
  238. else
  239. {
  240. long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageVote, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  241. ChangeRecord changeRecord = new ChangeRecord
  242. {
  243. id = input[0].Id,
  244. code = "going",
  245. pk = "ChangeRecord",
  246. sequenceNumber = end
  247. };
  248. await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}"));
  249. }
  250. break;
  251. }
  252. break;
  253. case "Survey":
  254. Survey survey = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync<Survey>(input[0].Id, new Azure.Cosmos.PartitionKey($"{code}"));
  255. var messageSurvey = new ServiceBusMessage(new { id = input[0].Id, name = "Survey", code = code }.ToJsonString());
  256. messageSurvey.Properties.Add("name", "Survey");
  257. ChangeRecord surveyRecord = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync<ChangeRecord>(input[0].Id, new Azure.Cosmos.PartitionKey($"{survey.progress}"));
  258. switch (survey.progress)
  259. {
  260. case "pending":
  261. if (surveyRecord != null)
  262. {
  263. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  264. surveyRecord.sequenceNumber = start;
  265. await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(surveyRecord, surveyRecord.id, new Azure.Cosmos.PartitionKey($"{surveyRecord.code}"));
  266. }
  267. else
  268. {
  269. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  270. ChangeRecord changeRecord = new ChangeRecord
  271. {
  272. id = input[0].Id,
  273. code = "pending",
  274. pk = "ChangeRecord",
  275. sequenceNumber = start
  276. };
  277. await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}"));
  278. }
  279. break;
  280. case "going":
  281. if (surveyRecord != null)
  282. {
  283. long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  284. surveyRecord.sequenceNumber = end;
  285. await client.GetContainer("TEAMModelOS", "Common").ReplaceItemAsync(surveyRecord, surveyRecord.id, new Azure.Cosmos.PartitionKey($"{surveyRecord.code}"));
  286. }
  287. else
  288. {
  289. long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  290. ChangeRecord changeRecord = new ChangeRecord
  291. {
  292. id = input[0].Id,
  293. code = "going",
  294. pk = "ChangeRecord",
  295. sequenceNumber = end
  296. };
  297. await client.GetContainer("TEAMModelOS", "Common").CreateItemAsync(changeRecord, new Azure.Cosmos.PartitionKey($"{changeRecord.code}"));
  298. }
  299. break;
  300. }
  301. break;
  302. }
  303. }
  304. }
  305. }
  306. }