using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Linq; using Azure; using TEAMModelOS.SDK.DI; using System.IO; using System.Diagnostics; using System.Text.Json; using System.Net; using System.Linq.Expressions; using TEAMModelOS.SDK; using TEAMModelOS.SDK.Extension; using TEAMModelOS.SDK.Models; using TEAMModelOS.SDK.Models.Cosmos.OpenEntity; using Microsoft.Azure.Cosmos; namespace TEAMModelOS.SDK.DI { public static class AzureCosmosExtensions { public static double RU(this Response response) { try { response.Headers.TryGetValue("x-ms-request-charge", out var value); var ru = Convert.ToDouble(value); return ru; } catch { return 0; } } public static string? GetContinuationToken(this Response response) { try { response.Headers.TryGetValue("x-ms-continuation", out var value); return value; } catch { return null; } } public static async Task GetOne(this Container container, QueryDefinition queryDefinition, string? partitionkey = null) { List list = new List(); //if (string.IsNullOrWhiteSpace(partitionkey)) //{ // return default; //} FeedIterator iterator = container.GetItemQueryIterator(queryDefinition: queryDefinition, requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null }); while (iterator.HasMoreResults) { FeedResponse currentResultSet = await iterator.ReadNextAsync(); list.AddRange(currentResultSet); } if (list.Count > 1) { throw new Exception("当前查询条件返回的结果不唯一"); } else if (list.Count <= 0) { return default; } else { return list.First(); } } public static async Task GetOne(this Container container, string sql, string? partitionkey = null) { List list = new List(); //if (string.IsNullOrWhiteSpace(partitionkey)) //{ // return default; //} FeedIterator iterator = container.GetItemQueryIterator(queryText: sql, requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null }); while (iterator.HasMoreResults) { FeedResponse currentResultSet = await iterator.ReadNextAsync(); list.AddRange(currentResultSet); } if (list.Count > 1) { throw new Exception("当前查询条件返回的结果不唯一"); } else if (list.Count <=0) { return default; } else { return list.First(); } } public static async Task> GetList(this Container container, QueryDefinition queryDefinition, string? partitionkey = null, string? continuationToken = null, int? pageSize = null) { List list = new List(); double RU = 0; FeedIterator iterator = container.GetItemQueryIterator(queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null }); while (iterator.HasMoreResults) { FeedResponse currentResultSet = await iterator.ReadNextAsync(); list.AddRange(currentResultSet); RU += currentResultSet.RequestCharge; //此处需要优化 ,检查相关的 关键字 用正则 if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase) || (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase) && !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase))) { continuationToken = null; } else { continuationToken = currentResultSet.ContinuationToken; } if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize) { break; } } return (new CosmosDBResult { list = list, ru = RU, continuationToken = continuationToken }); } public static async Task> GetList(this Container container, string sql, string? partitionkey = null, string? continuationToken = null, int? pageSize = null) { List list = new List(); double RU = 0; FeedIterator iterator = container.GetItemQueryIterator(queryText: sql, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null }); while (iterator.HasMoreResults) { FeedResponse currentResultSet = await iterator.ReadNextAsync(); list.AddRange(currentResultSet); RU += currentResultSet.RequestCharge; //此处需要优化 ,检查相关的 关键字 用正则 if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase) || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase) && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase))) { continuationToken = null; } else { continuationToken = currentResultSet.ContinuationToken; } if (pageSize.HasValue && pageSize.Value >=0 && list.Count>=pageSize) { break; } } //记录日志,RU开销大于400(开发测试),1000(正式) return (new CosmosDBResult { list = list, ru = RU, continuationToken = continuationToken }); } /// /// 取得当前容器指定分区键的Count数,支持SQL Where条件,不支持排序 /// /// /// /// /// public static async Task GetCount(this Container container, string? partitionkey = null, string queryWhere = "WHERE 1=1") { int totalCount = 0; var items = container.GetItemQueryStreamIterator( queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}", requestOptions: new QueryRequestOptions() { PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null, MaxItemCount = -1 }); while (items.HasMoreResults) { using var response = await items.ReadNextAsync(); using var json = await JsonDocument.ParseAsync(response.Content); if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0) { foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray()) { totalCount = obj.GetInt32(); } } } return totalCount; } public static async Task DeleteItemsStreamAsync(this Container container, List ids, string partitionkey) { List> responses = new List>(); if (ids!= null && ids.Count > 0) { foreach (var id in ids) { try { responses.Add(container.DeleteItemStreamAsync(id, new PartitionKey(partitionkey))); } catch { continue; } } } var response = await Task.WhenAll(responses); return response; } public static async Task[]> DeleteItemsAsync(this Container container, List ids, string partitionkey) { List>> responses = new List>>(); ; if (ids!= null && ids.Count > 0) { foreach (var id in ids) { try { responses.Add(container.DeleteItemAsync(id, new PartitionKey(partitionkey))); } catch { continue; } } } var response = await Task.WhenAll(responses); return response; } public static async IAsyncEnumerable GetItemQueryStreamIteratorSql(this Container container, string queryText = null, QueryRequestOptions requestOptions = null, string continuationToken = null) { var items = container.GetItemQueryStreamIterator( queryText: queryText, continuationToken: continuationToken, requestOptions: requestOptions); while (items.HasMoreResults) { ResponseMessage response = await items.ReadNextAsync(); if (response.IsSuccessStatusCode) { yield return response; } else { break; } } } public static async IAsyncEnumerable GetItemQueryStreamIteratorQuery(this Container container, QueryDefinition queryDefinition,QueryRequestOptions requestOptions = null, string continuationToken = null) { var items = container.GetItemQueryStreamIterator( queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: requestOptions); while (items.HasMoreResults) { ResponseMessage response = await items.ReadNextAsync(); if (response.IsSuccessStatusCode) { yield return response; } else { break; } } } public static async IAsyncEnumerable GetItemQueryIteratorQuery(this Container container, QueryDefinition queryDefinition, QueryRequestOptions requestOptions = null, string continuationToken = null) { var items = container.GetItemQueryIterator( queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: requestOptions); while (items.HasMoreResults) { FeedResponse response = await items.ReadNextAsync(); if (response.Any()) { foreach (var rs in response) { yield return rs; } } else { yield break; } } } public static async IAsyncEnumerable GetItemQueryIteratorSql(this Container container, string queryText, QueryRequestOptions requestOptions = null, string continuationToken = null) { var items = container.GetItemQueryIterator( queryText: queryText, continuationToken: continuationToken, requestOptions: requestOptions); while (items.HasMoreResults) { FeedResponse response = await items.ReadNextAsync(); if (response.Any()) { foreach (var rs in response) { yield return rs; } } else { yield break; } } } } public class CosmosDBResult { public List? list { get; set; } = new List(); public string? continuationToken { get; set; } //RU数据库查询开销 public double ru { get; set; } } }