AzureCosmosExtensions.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using System.Linq;
  8. using Azure;
  9. using TEAMModelOS.SDK.DI;
  10. using System.IO;
  11. using System.Diagnostics;
  12. using System.Text.Json;
  13. using System.Net;
  14. using System.Linq.Expressions;
  15. using TEAMModelOS.SDK;
  16. using TEAMModelOS.SDK.Extension;
  17. using TEAMModelOS.SDK.Models;
  18. using TEAMModelOS.SDK.Models.Cosmos.OpenEntity;
  19. using Microsoft.Azure.Cosmos;
  20. namespace TEAMModelOS.SDK.DI
  21. {
  22. public static class AzureCosmosExtensions
  23. {
  24. public static double RU(this Response response)
  25. {
  26. try
  27. {
  28. response.Headers.TryGetValue("x-ms-request-charge", out var value);
  29. var ru = Convert.ToDouble(value);
  30. return ru;
  31. }
  32. catch
  33. {
  34. return 0;
  35. }
  36. }
  37. public static string? GetContinuationToken(this Response response)
  38. {
  39. try
  40. {
  41. response.Headers.TryGetValue("x-ms-continuation", out var value);
  42. return value;
  43. }
  44. catch
  45. {
  46. return null;
  47. }
  48. }
  49. public static async Task<T?> GetOne<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null)
  50. {
  51. List<T?> list = new List<T?>();
  52. //if (string.IsNullOrWhiteSpace(partitionkey))
  53. //{
  54. // return default;
  55. //}
  56. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition,
  57. requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  58. while (iterator.HasMoreResults)
  59. {
  60. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  61. list.AddRange(currentResultSet);
  62. }
  63. if (list.Count > 1)
  64. {
  65. throw new Exception("当前查询条件返回的结果不唯一");
  66. }
  67. else if (list.Count <= 0)
  68. {
  69. return default;
  70. }
  71. else
  72. {
  73. return list.First();
  74. }
  75. }
  76. public static async Task<T?> GetOne<T>(this Container container, string sql, string? partitionkey = null)
  77. {
  78. List<T?> list = new List<T?>();
  79. //if (string.IsNullOrWhiteSpace(partitionkey))
  80. //{
  81. // return default;
  82. //}
  83. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql,
  84. requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  85. while (iterator.HasMoreResults)
  86. {
  87. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  88. list.AddRange(currentResultSet);
  89. }
  90. if (list.Count > 1)
  91. {
  92. throw new Exception("当前查询条件返回的结果不唯一");
  93. }
  94. else if (list.Count <=0)
  95. {
  96. return default;
  97. }
  98. else
  99. {
  100. return list.First();
  101. }
  102. }
  103. public static async Task<CosmosDBResult<T>> GetList<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null, string? continuationToken = null, int? pageSize = null)
  104. {
  105. List<T> list = new List<T>();
  106. double RU = 0;
  107. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  108. while (iterator.HasMoreResults)
  109. {
  110. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  111. list.AddRange(currentResultSet);
  112. RU += currentResultSet.RequestCharge;
  113. //此处需要优化 ,检查相关的 关键字 用正则
  114. if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
  115. || (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase)
  116. && !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  117. {
  118. continuationToken = null;
  119. }
  120. else
  121. {
  122. continuationToken = currentResultSet.ContinuationToken;
  123. }
  124. if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
  125. {
  126. break;
  127. }
  128. }
  129. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
  130. }
  131. public static async Task<CosmosDBResult<T>> GetList<T>(this Container container, string sql, string? partitionkey = null, string? continuationToken = null, int? pageSize = null)
  132. {
  133. List<T> list = new List<T>();
  134. double RU = 0;
  135. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  136. while (iterator.HasMoreResults)
  137. {
  138. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  139. list.AddRange(currentResultSet);
  140. RU += currentResultSet.RequestCharge;
  141. //此处需要优化 ,检查相关的 关键字 用正则
  142. if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
  143. || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase)
  144. && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  145. {
  146. continuationToken = null;
  147. }
  148. else
  149. {
  150. continuationToken = currentResultSet.ContinuationToken;
  151. }
  152. if (pageSize.HasValue && pageSize.Value >=0 && list.Count>=pageSize)
  153. {
  154. break;
  155. }
  156. }
  157. //记录日志,RU开销大于400(开发测试),1000(正式)
  158. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
  159. }
  160. /// <summary>
  161. /// 取得当前容器指定分区键的Count数,支持SQL Where条件,不支持排序
  162. /// </summary>
  163. /// <param name="container"></param>
  164. /// <param name="partitionkey"></param>
  165. /// <param name="queryWhere"></param>
  166. /// <returns></returns>
  167. public static async Task<int> GetCount(this Container container, string? partitionkey = null, string queryWhere = "WHERE 1=1")
  168. {
  169. int totalCount = 0;
  170. var items = container.GetItemQueryStreamIterator(
  171. queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}",
  172. requestOptions: new QueryRequestOptions() { PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null, MaxItemCount = -1 });
  173. while (items.HasMoreResults)
  174. {
  175. using var response = await items.ReadNextAsync();
  176. using var json = await JsonDocument.ParseAsync(response.Content);
  177. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  178. {
  179. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  180. {
  181. totalCount = obj.GetInt32();
  182. }
  183. }
  184. }
  185. return totalCount;
  186. }
  187. public static async Task<ResponseMessage[]> DeleteItemsStreamAsync(this Container container, List<string> ids, string partitionkey)
  188. {
  189. List<Task<ResponseMessage>> responses = new List<Task<ResponseMessage>>();
  190. if (ids!= null && ids.Count > 0)
  191. {
  192. foreach (var id in ids)
  193. {
  194. try
  195. {
  196. responses.Add(container.DeleteItemStreamAsync(id, new PartitionKey(partitionkey)));
  197. }
  198. catch
  199. {
  200. continue;
  201. }
  202. }
  203. }
  204. var response = await Task.WhenAll(responses);
  205. return response;
  206. }
  207. public static async Task<ItemResponse<T>[]> DeleteItemsAsync<T>(this Container container, List<string> ids, string partitionkey)
  208. {
  209. List<Task<ItemResponse<T>>> responses = new List<Task<ItemResponse<T>>>(); ;
  210. if (ids!= null && ids.Count > 0)
  211. {
  212. foreach (var id in ids)
  213. {
  214. try
  215. {
  216. responses.Add(container.DeleteItemAsync<T>(id, new PartitionKey(partitionkey)));
  217. }
  218. catch
  219. {
  220. continue;
  221. }
  222. }
  223. }
  224. var response = await Task.WhenAll(responses);
  225. return response;
  226. }
  227. public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorSql(this Container container, string queryText = null, QueryRequestOptions requestOptions = null, string continuationToken = null)
  228. {
  229. var items = container.GetItemQueryStreamIterator(
  230. queryText: queryText, continuationToken: continuationToken,
  231. requestOptions: requestOptions);
  232. while (items.HasMoreResults)
  233. {
  234. ResponseMessage response = await items.ReadNextAsync();
  235. if (response.IsSuccessStatusCode)
  236. {
  237. yield return response;
  238. }
  239. else
  240. {
  241. break;
  242. }
  243. }
  244. }
  245. public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorQuery(this Container container, QueryDefinition queryDefinition,QueryRequestOptions requestOptions = null, string continuationToken = null)
  246. {
  247. var items = container.GetItemQueryStreamIterator(
  248. queryDefinition: queryDefinition, continuationToken: continuationToken,
  249. requestOptions: requestOptions);
  250. while (items.HasMoreResults)
  251. {
  252. ResponseMessage response = await items.ReadNextAsync();
  253. if (response.IsSuccessStatusCode)
  254. {
  255. yield return response;
  256. }
  257. else
  258. {
  259. break;
  260. }
  261. }
  262. }
  263. public static async IAsyncEnumerable<T> GetItemQueryIteratorQuery<T>(this Container container, QueryDefinition queryDefinition, QueryRequestOptions requestOptions = null, string continuationToken = null)
  264. {
  265. var items = container.GetItemQueryIterator<T>(
  266. queryDefinition: queryDefinition, continuationToken: continuationToken,
  267. requestOptions: requestOptions);
  268. while (items.HasMoreResults)
  269. {
  270. FeedResponse<T> response = await items.ReadNextAsync();
  271. if (response.Any())
  272. {
  273. foreach (var rs in response)
  274. {
  275. yield return rs;
  276. }
  277. }
  278. else
  279. {
  280. yield break;
  281. }
  282. }
  283. }
  284. public static async IAsyncEnumerable<T> GetItemQueryIteratorSql<T>(this Container container, string queryText, QueryRequestOptions requestOptions = null, string continuationToken = null)
  285. {
  286. var items = container.GetItemQueryIterator<T>(
  287. queryText: queryText, continuationToken: continuationToken,
  288. requestOptions: requestOptions);
  289. while (items.HasMoreResults)
  290. {
  291. FeedResponse<T> response = await items.ReadNextAsync();
  292. if (response.Any())
  293. {
  294. foreach (var rs in response)
  295. {
  296. yield return rs;
  297. }
  298. }
  299. else
  300. {
  301. yield break;
  302. }
  303. }
  304. }
  305. }
  306. public class CosmosDBResult<T>
  307. {
  308. public List<T>? list { get; set; } = new List<T>();
  309. public string? continuationToken { get; set; }
  310. //RU数据库查询开销
  311. public double ru { get; set; }
  312. }
  313. }