AzureCosmosExtensions.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. using Microsoft.Azure.Cosmos.Table;
  2. using Microsoft.Azure.Cosmos.Table.Queryable;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using System.Linq;
  10. using Azure;
  11. using TEAMModelOS.SDK.DI;
  12. using System.IO;
  13. using System.Diagnostics;
  14. using Azure.Cosmos;
  15. using System.Text.Json;
  16. using System.Net;
  17. using System.Linq.Expressions;
  18. using TEAMModelOS.SDK;
  19. using TEAMModelOS.SDK.Helper.Common.JsonHelper;
  20. using TEAMModelOS.SDK.Extension;
  21. using TEAMModelOS.SDK.Models;
  22. using TEAMModelOS.SDK.Models.Cosmos.OpenEntity;
  23. using OpenXmlPowerTools;
  24. namespace TEAMModelOS.SDK.DI
  25. {
  26. public static class AzureCosmosExtensions
  27. {
  28. public static double RU(this Response response)
  29. {
  30. try
  31. {
  32. response.Headers.TryGetValue("x-ms-request-charge", out var value);
  33. var ru = Convert.ToDouble(value);
  34. return ru;
  35. }
  36. catch
  37. {
  38. return 0;
  39. }
  40. }
  41. public static string GetContinuationToken(this Response response)
  42. {
  43. try
  44. {
  45. response.Headers.TryGetValue("x-ms-continuation", out var value);
  46. return value;
  47. }
  48. catch
  49. {
  50. return null;
  51. }
  52. }
  53. /// <summary>
  54. /// 取得当前容器指定分区键的Count数,支持SQL Where条件,不支持排序
  55. /// </summary>
  56. /// <param name="container"></param>
  57. /// <param name="partitionkey"></param>
  58. /// <param name="queryWhere"></param>
  59. /// <returns></returns>
  60. public static async Task<int> GetCount(this CosmosContainer container, string partitionkey, string queryWhere = "WHERE 1=1")
  61. {
  62. int totalCount = 0;
  63. await foreach (var item in container.GetItemQueryStreamIterator(
  64. queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}",
  65. requestOptions: new QueryRequestOptions() { PartitionKey = new PartitionKey(partitionkey), MaxItemCount = -1 }))
  66. {
  67. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  68. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  69. {
  70. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  71. {
  72. totalCount = obj.GetInt32();
  73. }
  74. }
  75. }
  76. return totalCount;
  77. }
  78. public static async Task<List<Response>> DeleteItemsStreamAsync(this CosmosContainer container, List<string> ids, string partitionkey )
  79. {
  80. List<Response> responses = new List<Response>();
  81. foreach (var id in ids)
  82. {
  83. try
  84. {
  85. responses.Add(await container.DeleteItemStreamAsync(id, new PartitionKey(partitionkey)));
  86. }
  87. catch
  88. {
  89. continue;
  90. }
  91. }
  92. return responses;
  93. }
  94. public static async Task<List<ItemResponse<T>>> DeleteItemsAsync<T>(this CosmosContainer container, List<string> ids, string partitionkey)
  95. {
  96. List<ItemResponse<T>> responses = new List<ItemResponse<T>>();
  97. foreach (var id in ids)
  98. {
  99. try
  100. {
  101. responses.Add(await container.DeleteItemAsync<T>(id, new PartitionKey(partitionkey)));
  102. }
  103. catch
  104. {
  105. continue;
  106. }
  107. }
  108. return responses;
  109. }
  110. public static async Task<CosmosDBResult<T>> GetList<T>(this CosmosContainer container, QueryDefinition queryDefinition, string partitionkey, string? continuationToken = null, int? pageSize = null)
  111. {
  112. List<T> list = new List<T>();
  113. double RU = 0;
  114. if (string.IsNullOrWhiteSpace(partitionkey))
  115. {
  116. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
  117. }
  118. await foreach (var item in container.GetItemQueryStreamIterator(queryDefinition: queryDefinition, continuationToken: continuationToken,
  119. requestOptions: new QueryRequestOptions { MaxItemCount = pageSize, PartitionKey = new PartitionKey(partitionkey) }))
  120. {
  121. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  122. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  123. {
  124. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  125. {
  126. list.Add(obj.ToObject<T>());
  127. }
  128. }
  129. if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase) ||
  130. (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase) &&
  131. !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  132. {
  133. continuationToken = null;
  134. }
  135. else
  136. {
  137. continuationToken = item.GetContinuationToken();
  138. }
  139. RU +=item.RU();
  140. if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
  141. {
  142. break;
  143. }
  144. }
  145. //记录日志,RU开销大于400(开发测试),1000(正式)
  146. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken }); ;
  147. }
  148. public static async Task<CosmosDBResult<T>> GetList<T>(this CosmosContainer container, string sql, string partitionkey, string? continuationToken = null, int? pageSize = null)
  149. {
  150. List<T> list = new List<T>();
  151. double RU = 0;
  152. if (string.IsNullOrWhiteSpace(partitionkey))
  153. {
  154. try
  155. {
  156. await foreach (var item in container.GetItemQueryStreamIterator(queryText: sql, continuationToken: continuationToken,
  157. requestOptions: new QueryRequestOptions { MaxItemCount = pageSize }))
  158. {
  159. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  160. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  161. {
  162. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  163. {
  164. list.Add(obj.ToObject<T>());
  165. }
  166. }
  167. if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
  168. || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase)
  169. && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  170. {
  171. continuationToken = null;
  172. }
  173. else
  174. {
  175. continuationToken = item.GetContinuationToken();
  176. }
  177. RU += item.RU();
  178. if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
  179. {
  180. break;
  181. }
  182. }
  183. }
  184. catch (Exception ex)
  185. {
  186. Console.WriteLine(ex.ToString());
  187. }
  188. }
  189. else {
  190. try
  191. {
  192. await foreach (var item in container.GetItemQueryStreamIterator(queryText: sql, continuationToken: continuationToken,
  193. requestOptions: new QueryRequestOptions { MaxItemCount = pageSize, PartitionKey = new PartitionKey(partitionkey) }))
  194. {
  195. using var json = await JsonDocument.ParseAsync(item.ContentStream);
  196. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  197. {
  198. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  199. {
  200. list.Add(obj.ToObject<T>());
  201. }
  202. }
  203. if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
  204. || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase)
  205. && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  206. {
  207. continuationToken = null;
  208. }
  209. else
  210. {
  211. continuationToken = item.GetContinuationToken();
  212. }
  213. RU += item.RU();
  214. if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
  215. {
  216. break;
  217. }
  218. }
  219. }
  220. catch (Exception ex)
  221. {
  222. Console.WriteLine(ex.ToString());
  223. }
  224. }
  225. //记录日志,RU开销大于400(开发测试),1000(正式)
  226. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken }); ;
  227. }
  228. public class CosmosDBResult<T>
  229. {
  230. public List<T>? list { get; set; }
  231. public string? continuationToken { get; set; }
  232. public double ru { get; set; }
  233. }
  234. }
  235. }