AzureCosmosExtensions.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using System.Linq;
  8. using Azure;
  9. using TEAMModelOS.SDK.DI;
  10. using System.IO;
  11. using System.Diagnostics;
  12. using System.Text.Json;
  13. using System.Net;
  14. using System.Linq.Expressions;
  15. using TEAMModelOS.SDK;
  16. using TEAMModelOS.SDK.Extension;
  17. using TEAMModelOS.SDK.Models;
  18. using TEAMModelOS.SDK.Models.Cosmos.OpenEntity;
  19. using Microsoft.Azure.Cosmos;
  20. using TEAMModelOS.Models.Dto;
  21. namespace TEAMModelOS.SDK.DI
  22. {
  23. public static class AzureCosmosExtensions
  24. {
  25. public static double RU(this Response response)
  26. {
  27. try
  28. {
  29. response.Headers.TryGetValue("x-ms-request-charge", out var value);
  30. var ru = Convert.ToDouble(value);
  31. return ru;
  32. }
  33. catch
  34. {
  35. return 0;
  36. }
  37. }
  38. public static string? GetContinuationToken(this Response response)
  39. {
  40. try
  41. {
  42. response.Headers.TryGetValue("x-ms-continuation", out var value);
  43. return value;
  44. }
  45. catch
  46. {
  47. return null;
  48. }
  49. }
  50. public static async Task<T?> GetOne<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null)
  51. {
  52. List<T?> list = new List<T?>();
  53. //if (string.IsNullOrWhiteSpace(partitionkey))
  54. //{
  55. // return default;
  56. //}
  57. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition,
  58. requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  59. while (iterator.HasMoreResults)
  60. {
  61. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  62. list.AddRange(currentResultSet);
  63. }
  64. if (list.Count > 1)
  65. {
  66. throw new Exception("当前查询条件返回的结果不唯一");
  67. }
  68. else if (list.Count <= 0)
  69. {
  70. return default;
  71. }
  72. else
  73. {
  74. return list.First();
  75. }
  76. }
  77. public static async Task<T?> GetOne<T>(this Container container, string sql, string? partitionkey = null)
  78. {
  79. List<T?> list = new List<T?>();
  80. //if (string.IsNullOrWhiteSpace(partitionkey))
  81. //{
  82. // return default;
  83. //}
  84. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql,
  85. requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  86. while (iterator.HasMoreResults)
  87. {
  88. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  89. list.AddRange(currentResultSet);
  90. }
  91. if (list.Count > 1)
  92. {
  93. throw new Exception("当前查询条件返回的结果不唯一");
  94. }
  95. else if (list.Count <=0)
  96. {
  97. return default;
  98. }
  99. else
  100. {
  101. return list.First();
  102. }
  103. }
  104. public static async Task<CosmosDBResult<T>> GetList<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null, string? continuationToken = null, int? pageSize = null)
  105. {
  106. List<T> list = new List<T>();
  107. double RU = 0;
  108. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  109. while (iterator.HasMoreResults)
  110. {
  111. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  112. list.AddRange(currentResultSet);
  113. RU += currentResultSet.RequestCharge;
  114. //此处需要优化 ,检查相关的 关键字 用正则
  115. if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
  116. || (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase)
  117. && !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  118. {
  119. continuationToken = null;
  120. }
  121. else
  122. {
  123. continuationToken = currentResultSet.ContinuationToken;
  124. }
  125. if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
  126. {
  127. break;
  128. }
  129. }
  130. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
  131. }
  132. public static async Task<CosmosDBResult<T>> GetList<T>(this Container container, string sql, string? partitionkey = null, string? continuationToken = null, int? pageSize = null)
  133. {
  134. List<T> list = new List<T>();
  135. double RU = 0;
  136. FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
  137. while (iterator.HasMoreResults)
  138. {
  139. FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
  140. list.AddRange(currentResultSet);
  141. RU += currentResultSet.RequestCharge;
  142. //此处需要优化 ,检查相关的 关键字 用正则
  143. if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
  144. || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase)
  145. && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
  146. {
  147. continuationToken = null;
  148. }
  149. else
  150. {
  151. continuationToken = currentResultSet.ContinuationToken;
  152. }
  153. if (pageSize.HasValue && pageSize.Value >=0 && list.Count>=pageSize)
  154. {
  155. break;
  156. }
  157. }
  158. //记录日志,RU开销大于400(开发测试),1000(正式)
  159. return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
  160. }
  161. /// <summary>
  162. /// 取得当前容器指定分区键的Count数,支持SQL Where条件,不支持排序
  163. /// </summary>
  164. /// <param name="container"></param>
  165. /// <param name="partitionkey"></param>
  166. /// <param name="queryWhere"></param>
  167. /// <returns></returns>
  168. public static async Task<int> GetCount(this Container container, string? partitionkey = null, string queryWhere = "WHERE 1=1")
  169. {
  170. int totalCount = 0;
  171. var items = container.GetItemQueryStreamIterator(
  172. queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}",
  173. requestOptions: new QueryRequestOptions() { PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null, MaxItemCount = -1 });
  174. while (items.HasMoreResults)
  175. {
  176. using var response = await items.ReadNextAsync();
  177. using var json = await JsonDocument.ParseAsync(response.Content);
  178. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  179. {
  180. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  181. {
  182. totalCount = obj.GetInt32();
  183. }
  184. }
  185. }
  186. return totalCount;
  187. }
  188. public static async Task<ResponseMessage[]> DeleteItemsStreamAsync(this Container container, List<string> ids, string partitionkey)
  189. {
  190. List<Task<ResponseMessage>> responses = new List<Task<ResponseMessage>>();
  191. if (ids!= null && ids.Count > 0)
  192. {
  193. foreach (var id in ids)
  194. {
  195. try
  196. {
  197. responses.Add(container.DeleteItemStreamAsync(id, new PartitionKey(partitionkey)));
  198. }
  199. catch
  200. {
  201. continue;
  202. }
  203. }
  204. }
  205. var response = await Task.WhenAll(responses);
  206. return response;
  207. }
  208. public static async Task<ItemResponse<T>[]> DeleteItemsAsync<T>(this Container container, List<string> ids, string partitionkey)
  209. {
  210. List<Task<ItemResponse<T>>> responses = new List<Task<ItemResponse<T>>>(); ;
  211. if (ids!= null && ids.Count > 0)
  212. {
  213. foreach (var id in ids)
  214. {
  215. try
  216. {
  217. responses.Add(container.DeleteItemAsync<T>(id, new PartitionKey(partitionkey)));
  218. }
  219. catch
  220. {
  221. continue;
  222. }
  223. }
  224. }
  225. var response = await Task.WhenAll(responses);
  226. return response;
  227. }
  228. public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorSql(this Container container, string queryText = null, QueryRequestOptions requestOptions = null, string continuationToken = null)
  229. {
  230. var items = container.GetItemQueryStreamIterator(
  231. queryText: queryText, continuationToken: continuationToken,
  232. requestOptions: requestOptions);
  233. while (items.HasMoreResults)
  234. {
  235. ResponseMessage response = await items.ReadNextAsync();
  236. if (response.Content!=null)
  237. {
  238. yield return response;
  239. }
  240. //ResponseMessage response = await items.ReadNextAsync();
  241. //if (response.IsSuccessStatusCode)
  242. //{
  243. // yield return response;
  244. //}
  245. //else
  246. //{
  247. // break;
  248. //}
  249. }
  250. }
  251. public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorQuery(this Container container, QueryDefinition queryDefinition,QueryRequestOptions requestOptions = null, string continuationToken = null)
  252. {
  253. var items = container.GetItemQueryStreamIterator(
  254. queryDefinition: queryDefinition, continuationToken: continuationToken,
  255. requestOptions: requestOptions);
  256. while (items.HasMoreResults)
  257. {
  258. ResponseMessage response = await items.ReadNextAsync();
  259. if (response.Content!=null)
  260. {
  261. yield return response;
  262. }
  263. //ResponseMessage response = await items.ReadNextAsync();
  264. //if (response.IsSuccessStatusCode)
  265. //{
  266. // yield return response;
  267. //}
  268. //else
  269. //{
  270. // break;
  271. //}
  272. }
  273. }
  274. public static async IAsyncEnumerable<T> GetItemQueryIteratorQuery<T>(this Container container, QueryDefinition queryDefinition, QueryRequestOptions requestOptions = null, string continuationToken = null)
  275. {
  276. var items = container.GetItemQueryStreamIterator(
  277. queryDefinition: queryDefinition, continuationToken: continuationToken,
  278. requestOptions: requestOptions);
  279. while (items.HasMoreResults)
  280. {
  281. var response = await items.ReadNextAsync();
  282. if (response.Content!=null)
  283. {
  284. using var json = await JsonDocument.ParseAsync(response.Content);
  285. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  286. {
  287. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  288. {
  289. yield return obj.ToObject<T>();
  290. }
  291. }
  292. }
  293. //FeedResponse<T> response = await items.ReadNextAsync();
  294. //if (response.Any())
  295. //{
  296. // foreach (var rs in response)
  297. // {
  298. // yield return rs;
  299. // }
  300. //}
  301. //else
  302. //{
  303. // yield break;
  304. //}
  305. }
  306. }
  307. public static async IAsyncEnumerable<T> GetItemQueryIteratorSql<T>(this Container container, string queryText, QueryRequestOptions requestOptions = null, string continuationToken = null)
  308. {
  309. var items = container.GetItemQueryStreamIterator(
  310. queryText: queryText, continuationToken: continuationToken,
  311. requestOptions: requestOptions);
  312. while (items.HasMoreResults)
  313. {
  314. var response = await items.ReadNextAsync();
  315. if (response.Content!=null)
  316. {
  317. using var json = await JsonDocument.ParseAsync(response.Content);
  318. if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
  319. {
  320. foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
  321. {
  322. yield return obj.ToObject<T>();
  323. }
  324. }
  325. }
  326. //FeedResponse<T> response = await items.ReadNextAsync();
  327. //if (response.Any())
  328. //{
  329. // foreach (var rs in response)
  330. // {
  331. // yield return rs;
  332. // }
  333. //}
  334. //else
  335. //{
  336. // yield break;
  337. //}
  338. }
  339. }
  340. }
  341. public class CosmosDBResult<T>
  342. {
  343. public List<T> list { get; set; } = new List<T>();
  344. public string? continuationToken { get; set; }
  345. //RU数据库查询开销
  346. public double ru { get; set; }
  347. }
  348. }