AzureCosmosExtensions.cs 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using System.Linq;
  8. using Azure;
  9. using TEAMModelOS.SDK.DI;
  10. using System.IO;
  11. using System.Diagnostics;
  12. using Azure.Cosmos;
  13. using System.Text.Json;
  14. using System.Net;
  15. using System.Linq.Expressions;
  16. using TEAMModelOS.SDK;
  17. using TEAMModelOS.SDK.Extension;
  18. using TEAMModelOS.SDK.Models;
  19. using TEAMModelOS.SDK.Models.Cosmos.OpenEntity;
  20. namespace TEAMModelOS.SDK.DI
  21. {
  22. public static class AzureCosmosExtensions
  23. {
  24. public static double RU(this Response response)
  25. {
  26. try
  27. {
  28. response.Headers.TryGetValue("x-ms-request-charge", out var value);
  29. var ru = Convert.ToDouble(value);
  30. return ru;
  31. }
  32. catch
  33. {
  34. return 0;
  35. }
  36. }
  37. public static string GetContinuationToken(this Response response)
  38. {
  39. try
  40. {
  41. response.Headers.TryGetValue("x-ms-continuation", out var value);
  42. return value;
  43. }
  44. catch
  45. {
  46. return null;
  47. }
  48. }
  49. /// <summary>
  50. /// 取得当前容器指定分区键的Count数,支持SQL Where条件,不支持排序
  51. /// </summary>
  52. /// <param name="container"></param>
  53. /// <param name="partitionkey"></param>
  54. /// <param name="queryWhere"></param>
  55. /// <returns></returns>
  56. public static async Task<int> GetCount(this CosmosContainer container, string partitionkey, string queryWhere = "WHERE 1=1")
  57. {
  58. int totalCount = 0;
  59. await foreach (var item in container.GetItemQueryStreamIterator(
  60. queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}",
  61. requestOptions: new QueryRequestOptions() { PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null , MaxItemCount = -1 }))
  62. {
  63. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  64. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  65. {
  66. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  67. {
  68. totalCount = obj.GetInt32();
  69. }
  70. }
  71. }
  72. return totalCount;
  73. }
  74. public static async Task<List<Response>> DeleteItemsStreamAsync(this CosmosContainer container, List<string> ids, string partitionkey )
  75. {
  76. List<Response> responses = new List<Response>();
  77. foreach (var id in ids)
  78. {
  79. try
  80. {
  81. responses.Add(await container.DeleteItemStreamAsync(id, new PartitionKey(partitionkey)));
  82. }
  83. catch
  84. {
  85. continue;
  86. }
  87. }
  88. return responses;
  89. }
  90. public static async Task<List<ItemResponse<T>>> DeleteItemsAsync<T>(this CosmosContainer container, List<string> ids, string partitionkey)
  91. {
  92. List<ItemResponse<T>> responses = new List<ItemResponse<T>>();
  93. foreach (var id in ids)
  94. {
  95. try
  96. {
  97. responses.Add(await container.DeleteItemAsync<T>(id, new PartitionKey(partitionkey)));
  98. }
  99. catch
  100. {
  101. continue;
  102. }
  103. }
  104. return responses;
  105. }
  106. public static async Task<CosmosDBResult<T>> GetList<T>(this CosmosContainer container, QueryDefinition queryDefinition, string partitionkey= null, string continuationToken = null, int? pageSize = null)
  107. {
  108. List<T> list = new List<T>();
  109. double RU = 0;
  110. await foreach (var item in container.GetItemQueryStreamIterator(queryDefinition: queryDefinition, continuationToken: continuationToken,
  111. requestOptions: new QueryRequestOptions { MaxItemCount = pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null }))
  112. {
  113. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  114. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  115. {
  116. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  117. {
  118. list.Add(obj.ToObject<T>());
  119. }
  120. }
  121. if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase) ||
  122. (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase) &&
  123. !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  124. {
  125. continuationToken = null;
  126. }
  127. else
  128. {
  129. continuationToken = item.GetContinuationToken();
  130. }
  131. RU +=item.RU();
  132. if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
  133. {
  134. break;
  135. }
  136. }
  137. //记录日志,RU开销大于400(开发测试),1000(正式)
  138. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken }); ;
  139. }
  140. public static async Task<CosmosDBResult<T>> GetList<T>(this CosmosContainer container, string sql, string partitionkey= null, string continuationToken = null, int? pageSize = null)
  141. {
  142. List<T> list = new List<T>();
  143. double RU = 0;
  144. try
  145. {
  146. await foreach (var item in container.GetItemQueryStreamIterator(queryText: sql, continuationToken: continuationToken,
  147. requestOptions: new QueryRequestOptions { MaxItemCount = pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey)?new PartitionKey(partitionkey) : null}))
  148. {
  149. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  150. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  151. {
  152. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  153. {
  154. list.Add(obj.ToObject<T>());
  155. }
  156. }
  157. if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
  158. || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase)
  159. && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  160. {
  161. continuationToken = null;
  162. }
  163. else
  164. {
  165. continuationToken = item.GetContinuationToken();
  166. }
  167. RU += item.RU();
  168. if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
  169. {
  170. break;
  171. }
  172. }
  173. }
  174. catch (Exception ex)
  175. {
  176. Console.WriteLine(ex.ToString());
  177. }
  178. //记录日志,RU开销大于400(开发测试),1000(正式)
  179. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken }); ;
  180. }
  181. public class CosmosDBResult<T>
  182. {
  183. public List<T>? list { get; set; }
  184. public string? continuationToken { get; set; }
  185. public double ru { get; set; }
  186. }
  187. }
  188. }