|
@@ -0,0 +1,384 @@
|
|
|
+using Azure;
|
|
|
+using Microsoft.Azure.Cosmos;
|
|
|
+using System.Text.Json;
|
|
|
+
|
|
|
+namespace HTEXCosmosDB
|
|
|
+{
|
|
|
+ public static class AzureCosmosExtensions
|
|
|
+ {
|
|
|
+ public static double RU(this Response response)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ response.Headers.TryGetValue("x-ms-request-charge", out var value);
|
|
|
+ var ru = Convert.ToDouble(value);
|
|
|
+ return ru;
|
|
|
+ }
|
|
|
+ catch
|
|
|
+ {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static string? GetContinuationToken(this Response response)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ response.Headers.TryGetValue("x-ms-continuation", out var value);
|
|
|
+ return value;
|
|
|
+ }
|
|
|
+ catch
|
|
|
+ {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //public static async IAsyncEnumerable<(List<T> list, double RU, string? continuationToken)> GetItemQueryIterator<T>(this Container container, QueryDefinition queryDefinition, QueryRequestOptions requestOptions,string? continuationToken = null, int? pageSize = null)
|
|
|
+ //{
|
|
|
+ // List<T> list = new List<T>();
|
|
|
+ // double RU = 0;
|
|
|
+ // //if (string.IsNullOrWhiteSpace(partitionkey))
|
|
|
+ // //{
|
|
|
+ // // return (list, RU, continuationToken);
|
|
|
+ // //}
|
|
|
+ // FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: requestOptions);
|
|
|
+ // while (iterator.HasMoreResults)
|
|
|
+ // {
|
|
|
+ // FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
|
|
|
+ // list.AddRange(currentResultSet);
|
|
|
+ // RU += currentResultSet.RequestCharge;
|
|
|
+ // //此处需要优化 ,检查相关的 关键字 用正则
|
|
|
+ // if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ // || (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ // && !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
|
|
|
+ // {
|
|
|
+ // continuationToken = null;
|
|
|
+ // }
|
|
|
+ // else
|
|
|
+ // {
|
|
|
+ // continuationToken = currentResultSet.ContinuationToken;
|
|
|
+ // }
|
|
|
+ // if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
|
|
|
+ // {
|
|
|
+ // break;
|
|
|
+ // }
|
|
|
+ // yield return (list, RU, continuationToken);
|
|
|
+ // }
|
|
|
+
|
|
|
+ //}
|
|
|
+
|
|
|
+ //public static async Task<(List<T> list, double RU, string? continuationToken)> GetList<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null , string? continuationToken = null, int? pageSize = null)
|
|
|
+ //{
|
|
|
+ // List<T> list = new List<T>();
|
|
|
+ // double RU = 0;
|
|
|
+ // //if (string.IsNullOrWhiteSpace(partitionkey))
|
|
|
+ // //{
|
|
|
+ // // return (list, RU, continuationToken);
|
|
|
+ // //}
|
|
|
+ // FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount = pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
|
|
|
+ // while (iterator.HasMoreResults)
|
|
|
+ // {
|
|
|
+ // FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
|
|
|
+ // list.AddRange(currentResultSet);
|
|
|
+ // RU += currentResultSet.RequestCharge;
|
|
|
+ // //此处需要优化 ,检查相关的 关键字 用正则
|
|
|
+ // if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ // || (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ // && !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
|
|
|
+ // {
|
|
|
+ // continuationToken = null;
|
|
|
+ // }
|
|
|
+ // else
|
|
|
+ // {
|
|
|
+ // continuationToken = currentResultSet.ContinuationToken;
|
|
|
+ // }
|
|
|
+ // if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
|
|
|
+ // {
|
|
|
+ // break;
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // return (list, RU, continuationToken);
|
|
|
+ //}
|
|
|
+ public static async Task<T?> GetOne<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null)
|
|
|
+ {
|
|
|
+ List<T?> list = new List<T?>();
|
|
|
+ //if (string.IsNullOrWhiteSpace(partitionkey))
|
|
|
+ //{
|
|
|
+ // return default;
|
|
|
+ //}
|
|
|
+ FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition,
|
|
|
+ requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
|
|
|
+ while (iterator.HasMoreResults)
|
|
|
+ {
|
|
|
+ FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
|
|
|
+ list.AddRange(currentResultSet);
|
|
|
+ }
|
|
|
+ if (list.Count > 1)
|
|
|
+ {
|
|
|
+ throw new Exception("当前查询条件返回的结果不唯一");
|
|
|
+ }
|
|
|
+ else if (list.Count <= 0)
|
|
|
+ {
|
|
|
+ return default;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return list.First();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static async Task<T?> GetOne<T>(this Container container, string sql, string? partitionkey = null)
|
|
|
+ {
|
|
|
+ List<T?> list = new List<T?>();
|
|
|
+ //if (string.IsNullOrWhiteSpace(partitionkey))
|
|
|
+ //{
|
|
|
+ // return default;
|
|
|
+ //}
|
|
|
+ FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql,
|
|
|
+ requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
|
|
|
+ while (iterator.HasMoreResults)
|
|
|
+ {
|
|
|
+ FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
|
|
|
+ list.AddRange(currentResultSet);
|
|
|
+ }
|
|
|
+ if (list.Count > 1)
|
|
|
+ {
|
|
|
+ throw new Exception("当前查询条件返回的结果不唯一");
|
|
|
+ }
|
|
|
+ else if (list.Count <=0)
|
|
|
+ {
|
|
|
+ return default;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return list.First();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async IAsyncEnumerable<CosmosDBResult<T>> GetItemQueryIteratorList<T>(this Container container, string queryText, QueryRequestOptions? requestOptions= null, string? continuationToken = null, int? pageSize = null)
|
|
|
+ {
|
|
|
+
|
|
|
+ //if (string.IsNullOrWhiteSpace(partitionkey))
|
|
|
+ //{
|
|
|
+ // return (new CosmosDBResult<T> { list = list, ru=RU, continuationToken=continuationToken });
|
|
|
+ //}
|
|
|
+ FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: queryText, continuationToken: continuationToken, requestOptions: requestOptions);
|
|
|
+ while (iterator.HasMoreResults)
|
|
|
+ {
|
|
|
+ List<T> list = new List<T>();
|
|
|
+ double RU = 0;
|
|
|
+ FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
|
|
|
+ list.AddRange(currentResultSet);
|
|
|
+ RU += currentResultSet.RequestCharge;
|
|
|
+ //此处需要优化 ,检查相关的 关键字 用正则
|
|
|
+ if (queryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ || (queryText.Contains("order ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ && !queryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
|
|
|
+ {
|
|
|
+ continuationToken = null;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ continuationToken = currentResultSet.ContinuationToken;
|
|
|
+ }
|
|
|
+ if (pageSize.HasValue && pageSize.Value >=0 && list.Count>=pageSize)
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ //记录日志,RU开销大于400(开发测试),1000(正式)
|
|
|
+ yield return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ public static async Task<CosmosDBResult<T>> GetList<T>(this Container container, string sql, string? partitionkey = null, string? continuationToken = null, int? pageSize = null)
|
|
|
+ {
|
|
|
+ List<T> list = new List<T>();
|
|
|
+ double RU = 0;
|
|
|
+ //if (string.IsNullOrWhiteSpace(partitionkey))
|
|
|
+ //{
|
|
|
+ // return (new CosmosDBResult<T> { list = list, ru=RU, continuationToken=continuationToken });
|
|
|
+ //}
|
|
|
+ FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
|
|
|
+ while (iterator.HasMoreResults)
|
|
|
+ {
|
|
|
+ FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
|
|
|
+ list.AddRange(currentResultSet);
|
|
|
+ RU += currentResultSet.RequestCharge;
|
|
|
+ //此处需要优化 ,检查相关的 关键字 用正则
|
|
|
+ if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase)
|
|
|
+ && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
|
|
|
+ {
|
|
|
+ continuationToken = null;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ continuationToken = currentResultSet.ContinuationToken;
|
|
|
+ }
|
|
|
+ if (pageSize.HasValue && pageSize.Value >=0 && list.Count>=pageSize)
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //记录日志,RU开销大于400(开发测试),1000(正式)
|
|
|
+ return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 取得当前容器指定分区键的Count数,支持SQL Where条件,不支持排序
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="container"></param>
|
|
|
+ /// <param name="partitionkey"></param>
|
|
|
+ /// <param name="queryWhere"></param>
|
|
|
+ /// <returns></returns>
|
|
|
+ public static async Task<int> GetCount(this Container container, string queryWhere = "WHERE 1=1", string? partitionkey = null)
|
|
|
+ {
|
|
|
+ int totalCount = 0;
|
|
|
+ var items = container.GetItemQueryStreamIterator(
|
|
|
+ queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}",
|
|
|
+ requestOptions: new QueryRequestOptions() { PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null, MaxItemCount = -1 });
|
|
|
+ while (items.HasMoreResults)
|
|
|
+ {
|
|
|
+ using var response = await items.ReadNextAsync();
|
|
|
+ using var json = await JsonDocument.ParseAsync(response.Content);
|
|
|
+ if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
|
|
|
+ {
|
|
|
+ foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
|
|
|
+ {
|
|
|
+ totalCount = obj.GetInt32();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return totalCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static async Task<ResponseMessage[]> DeleteItemsStreamAsync(this Container container, List<string> ids, string partitionkey)
|
|
|
+ {
|
|
|
+ List<Task<ResponseMessage>> responses = new List<Task<ResponseMessage>>();
|
|
|
+ if (ids!= null && ids.Count > 0)
|
|
|
+ {
|
|
|
+ foreach (var id in ids)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ responses.Add(container.DeleteItemStreamAsync(id, new PartitionKey(partitionkey)));
|
|
|
+ }
|
|
|
+ catch
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ var response = await Task.WhenAll(responses);
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static async Task<ItemResponse<T>[]> DeleteItemsAsync<T>(this Container container, List<string> ids, string partitionkey)
|
|
|
+ {
|
|
|
+ List<Task<ItemResponse<T>>> responses = new List<Task<ItemResponse<T>>>(); ;
|
|
|
+ if (ids!= null && ids.Count > 0)
|
|
|
+ {
|
|
|
+ foreach (var id in ids)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ responses.Add(container.DeleteItemAsync<T>(id, new PartitionKey(partitionkey)));
|
|
|
+ }
|
|
|
+ catch
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ var response = await Task.WhenAll(responses);
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorSql(this Microsoft.Azure.Cosmos.Container container, string queryText = null, string continuationToken = null, Microsoft.Azure.Cosmos.QueryRequestOptions requestOptions = null)
|
|
|
+ {
|
|
|
+ var items = container.GetItemQueryStreamIterator(
|
|
|
+ queryText: queryText, continuationToken: continuationToken,
|
|
|
+ requestOptions: requestOptions);
|
|
|
+ while (items.HasMoreResults)
|
|
|
+ {
|
|
|
+ ResponseMessage response = await items.ReadNextAsync();
|
|
|
+ if (response.IsSuccessStatusCode)
|
|
|
+ {
|
|
|
+ yield return response;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorQuery(this Microsoft.Azure.Cosmos.Container container, Microsoft.Azure.Cosmos.QueryDefinition queryDefinition, string continuationToken = null, Microsoft.Azure.Cosmos.QueryRequestOptions requestOptions = null)
|
|
|
+ {
|
|
|
+ var items = container.GetItemQueryStreamIterator(
|
|
|
+ queryDefinition: queryDefinition, continuationToken: continuationToken,
|
|
|
+ requestOptions: requestOptions);
|
|
|
+ while (items.HasMoreResults)
|
|
|
+ {
|
|
|
+ ResponseMessage response = await items.ReadNextAsync();
|
|
|
+ if (response.IsSuccessStatusCode)
|
|
|
+ {
|
|
|
+ yield return response;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static async IAsyncEnumerable<T> GetItemQueryIteratorQuery<T>(this Microsoft.Azure.Cosmos.Container container, Microsoft.Azure.Cosmos.QueryDefinition queryDefinition, string continuationToken = null, Microsoft.Azure.Cosmos.QueryRequestOptions requestOptions = null)
|
|
|
+ {
|
|
|
+ var items = container.GetItemQueryIterator<T>(
|
|
|
+ queryDefinition: queryDefinition, continuationToken: continuationToken,
|
|
|
+ requestOptions: requestOptions);
|
|
|
+ while (items.HasMoreResults)
|
|
|
+ {
|
|
|
+ FeedResponse<T> response = await items.ReadNextAsync();
|
|
|
+ if (response.Any())
|
|
|
+ {
|
|
|
+ foreach (var rs in response)
|
|
|
+ {
|
|
|
+ yield return rs;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ yield break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public static async IAsyncEnumerable<T> GetItemQueryIteratorSql<T>(this Microsoft.Azure.Cosmos.Container container, string queryText, string continuationToken = null, Microsoft.Azure.Cosmos.QueryRequestOptions requestOptions = null)
|
|
|
+ {
|
|
|
+ var items = container.GetItemQueryIterator<T>(
|
|
|
+ queryText: queryText, continuationToken: continuationToken,
|
|
|
+ requestOptions: requestOptions);
|
|
|
+
|
|
|
+ while (items.HasMoreResults)
|
|
|
+ {
|
|
|
+ FeedResponse<T> response = await items.ReadNextAsync();
|
|
|
+ if (response.Any())
|
|
|
+ {
|
|
|
+ foreach (var rs in response) {
|
|
|
+ yield return rs;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ yield break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public class CosmosDBResult<T>
|
|
|
+ {
|
|
|
+ public List<T>? list { get; set; } = new List<T>();
|
|
|
+ public string? continuationToken { get; set; }
|
|
|
+ //RU数据库查询开销
|
|
|
+ public double ru { get; set; }
|
|
|
+ }
|
|
|
+}
|