TriggerSurvey.cs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. using Azure.Cosmos;
  2. using Azure.Messaging.ServiceBus;
  3. using Microsoft.Azure.Documents;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using System.Text.Json;
  8. using System.Threading.Tasks;
  9. using TEAMModelOS.SDK.DI;
  10. using TEAMModelOS.SDK.Extension;
  11. using TEAMModelOS.SDK.Helper.Common.CollectionHelper;
  12. using TEAMModelOS.SDK.Models;
  13. using TEAMModelOS.SDK.Models.Cosmos;
  14. namespace TEAMModelFunction
  15. {
  16. public class TriggerSurvey
  17. {
  18. public static async void Trigger(AzureCosmosFactory _azureCosmos, AzureServiceBusFactory _serviceBus, AzureStorageFactory _azureStorage, DingDing _dingDing,
  19. CosmosClient client, Document input, string code, long stime, long etime, string school,AzureRedisFactory _azureRedis)
  20. {
  21. Survey survey = await client.GetContainer("TEAMModelOS", "Common").ReadItemAsync<Survey>(input.Id, new Azure.Cosmos.PartitionKey($"{code}"));
  22. List<ChangeRecord> changeRecords = await _azureStorage.FindListByDict<ChangeRecord>(new Dictionary<string, object>() { { "RowKey", input.Id }, { "PartitionKey", survey.progress } });
  23. if (survey.ttl >= 1)
  24. {
  25. //TODO 处理TTL删除业务
  26. return;
  27. }
  28. switch (survey.progress)
  29. {
  30. case "pending":
  31. var messageSurvey = new ServiceBusMessage(new { id = input.Id, progress = "going", code = code }.ToJsonString());
  32. messageSurvey.ApplicationProperties.Add("name", "Survey");
  33. if (changeRecords.Count > 0)
  34. {
  35. await _serviceBus.GetServiceBusClient().cancelMessage("active-task", changeRecords[0].sequenceNumber);
  36. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  37. changeRecords[0].sequenceNumber = start;
  38. await _azureStorage.SaveOrUpdate<ChangeRecord>(changeRecords[0]);
  39. }
  40. else
  41. {
  42. long start = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurvey, DateTimeOffset.FromUnixTimeMilliseconds(stime));
  43. ChangeRecord changeRecord = new ChangeRecord
  44. {
  45. RowKey = input.Id,
  46. PartitionKey = "pending",
  47. sequenceNumber = start,
  48. msgId = messageSurvey.MessageId
  49. };
  50. await _azureStorage.Save<ChangeRecord>(changeRecord);
  51. }
  52. break;
  53. case "going":
  54. ActivityData data;
  55. if (survey.scope == "school")
  56. {
  57. data = new ActivityData
  58. {
  59. id = survey.id,
  60. code = $"Activity-{survey.owner}",
  61. type = "survey",
  62. name = survey.name,
  63. startTime = survey.startTime,
  64. endTime = survey.endTime,
  65. scode = survey.code,
  66. scope = survey.scope,
  67. classes = survey.classes.IsNotEmpty() ? survey.classes : new List<string> { "" },
  68. tmdids = survey.tmdids.IsNotEmpty() ? survey.tmdids : new List<string> { "" },
  69. progress = "going",
  70. owner = survey.owner,
  71. subjects = new List<string> { "" }
  72. };
  73. await client.GetContainer("TEAMModelOS", "School").UpsertItemAsync<ActivityData>(data, new Azure.Cosmos.PartitionKey(data.code));
  74. }
  75. else if (survey.scope == "private")
  76. {
  77. data = new ActivityData
  78. {
  79. id = survey.id,
  80. code = $"Activity-Common",
  81. type = "survey",
  82. name = survey.name,
  83. startTime = survey.startTime,
  84. endTime = survey.endTime,
  85. scode = survey.code,
  86. scope = survey.scope,
  87. progress = "going",
  88. classes = survey.classes.IsNotEmpty() ? survey.classes : new List<string> { "" },
  89. owner = survey.owner,
  90. tmdids = new List<string> { "" },
  91. subjects = new List<string> { "" }
  92. };
  93. await client.GetContainer("TEAMModelOS", "Teacher").UpsertItemAsync<ActivityData>(data, new Azure.Cosmos.PartitionKey(data.code));
  94. }
  95. var messageSurveyEnd = new ServiceBusMessage(new { id = input.Id, progress = "finish", code = code }.ToJsonString());
  96. messageSurveyEnd.ApplicationProperties.Add("name", "Survey");
  97. if (changeRecords.Count > 0)
  98. {
  99. long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurveyEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  100. await _serviceBus.GetServiceBusClient().cancelMessage("active-task", changeRecords[0].sequenceNumber);
  101. changeRecords[0].sequenceNumber = end;
  102. await _azureStorage.SaveOrUpdate<ChangeRecord>(changeRecords[0]);
  103. }
  104. else
  105. {
  106. long end = await _serviceBus.GetServiceBusClient().SendScheduleMessageAsync("active-task", messageSurveyEnd, DateTimeOffset.FromUnixTimeMilliseconds(etime));
  107. ChangeRecord changeRecord = new ChangeRecord
  108. {
  109. RowKey = input.Id,
  110. PartitionKey = "going",
  111. sequenceNumber = end,
  112. msgId = messageSurveyEnd.MessageId
  113. };
  114. await _azureStorage.Save<ChangeRecord>(changeRecord);
  115. }
  116. break;
  117. case "finish":
  118. var records =await _azureRedis.GetRedisClient(8).HashGetAllAsync($"Survey:Record:{survey.id}_{survey.code}");
  119. List<dynamic> recs = new List<dynamic>();
  120. foreach (var rcd in records) {
  121. //var key =int.Parse(rcd.Name.ToString());
  122. var value = rcd.Value.ToString().ToObject<JsonElement>();
  123. recs.Add(new { index = rcd.Name.ToString(), count = value });
  124. }
  125. await _azureStorage.UploadFileByContainer(survey.owner, recs.ToJsonString(), "vote", $"{survey.id}/record.json");
  126. break;
  127. }
  128. }
  129. }
  130. }