123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Linq;
- using Azure;
- using TEAMModelOS.SDK.DI;
- using System.IO;
- using System.Diagnostics;
- using System.Text.Json;
- using System.Net;
- using System.Linq.Expressions;
- using TEAMModelOS.SDK;
- using TEAMModelOS.SDK.Extension;
- using TEAMModelOS.SDK.Models;
- using TEAMModelOS.SDK.Models.Cosmos.OpenEntity;
- using Microsoft.Azure.Cosmos;
- namespace TEAMModelOS.SDK.DI
- {
- public static class AzureCosmosExtensions
- {
- public static double RU(this Response response)
- {
- try
- {
- response.Headers.TryGetValue("x-ms-request-charge", out var value);
- var ru = Convert.ToDouble(value);
- return ru;
- }
- catch
- {
- return 0;
- }
- }
- public static string? GetContinuationToken(this Response response)
- {
- try
- {
- response.Headers.TryGetValue("x-ms-continuation", out var value);
- return value;
- }
- catch
- {
- return null;
- }
- }
- public static async Task<T?> GetOne<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null)
- {
- List<T?> list = new List<T?>();
- //if (string.IsNullOrWhiteSpace(partitionkey))
- //{
- // return default;
- //}
- FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition,
- requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
- while (iterator.HasMoreResults)
- {
- FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
- list.AddRange(currentResultSet);
- }
- if (list.Count > 1)
- {
- throw new Exception("当前查询条件返回的结果不唯一");
- }
- else if (list.Count <= 0)
- {
- return default;
- }
- else
- {
- return list.First();
- }
- }
- public static async Task<T?> GetOne<T>(this Container container, string sql, string? partitionkey = null)
- {
- List<T?> list = new List<T?>();
- //if (string.IsNullOrWhiteSpace(partitionkey))
- //{
- // return default;
- //}
- FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql,
- requestOptions: new QueryRequestOptions { MaxItemCount = 2, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
- while (iterator.HasMoreResults)
- {
- FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
- list.AddRange(currentResultSet);
- }
- if (list.Count > 1)
- {
- throw new Exception("当前查询条件返回的结果不唯一");
- }
- else if (list.Count <=0)
- {
- return default;
- }
- else
- {
- return list.First();
- }
- }
- public static async Task<CosmosDBResult<T>> GetList<T>(this Container container, QueryDefinition queryDefinition, string? partitionkey = null, string? continuationToken = null, int? pageSize = null)
- {
- List<T> list = new List<T>();
- double RU = 0;
- FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
- while (iterator.HasMoreResults)
- {
- FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
- list.AddRange(currentResultSet);
- RU += currentResultSet.RequestCharge;
- //此处需要优化 ,检查相关的 关键字 用正则
- if (queryDefinition.QueryText.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
- || (queryDefinition.QueryText.Contains("order ", StringComparison.OrdinalIgnoreCase)
- && !queryDefinition.QueryText.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
- {
- continuationToken = null;
- }
- else
- {
- continuationToken = currentResultSet.ContinuationToken;
- }
- if (pageSize.HasValue && pageSize.Value >= 0 && list.Count >= pageSize)
- {
- break;
- }
- }
- return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
- }
- public static async Task<CosmosDBResult<T>> GetList<T>(this Container container, string sql, string? partitionkey = null, string? continuationToken = null, int? pageSize = null)
- {
- List<T> list = new List<T>();
- double RU = 0;
- FeedIterator<T> iterator = container.GetItemQueryIterator<T>(queryText: sql, continuationToken: continuationToken, requestOptions: new QueryRequestOptions { MaxItemCount=pageSize, PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null });
- while (iterator.HasMoreResults)
- {
- FeedResponse<T> currentResultSet = await iterator.ReadNextAsync();
- list.AddRange(currentResultSet);
- RU += currentResultSet.RequestCharge;
- //此处需要优化 ,检查相关的 关键字 用正则
- if (sql.Contains(" distinct ", StringComparison.OrdinalIgnoreCase)
- || (sql.Contains("order ", StringComparison.OrdinalIgnoreCase)
- && !sql.Contains(".order ", StringComparison.OrdinalIgnoreCase)))
- {
- continuationToken = null;
- }
- else
- {
- continuationToken = currentResultSet.ContinuationToken;
- }
- if (pageSize.HasValue && pageSize.Value >=0 && list.Count>=pageSize)
- {
- break;
- }
- }
- //记录日志,RU开销大于400(开发测试),1000(正式)
- return (new CosmosDBResult<T> { list = list, ru = RU, continuationToken = continuationToken });
- }
- /// <summary>
- /// 取得当前容器指定分区键的Count数,支持SQL Where条件,不支持排序
- /// </summary>
- /// <param name="container"></param>
- /// <param name="partitionkey"></param>
- /// <param name="queryWhere"></param>
- /// <returns></returns>
- public static async Task<int> GetCount(this Container container, string? partitionkey = null, string queryWhere = "WHERE 1=1")
- {
- int totalCount = 0;
- var items = container.GetItemQueryStreamIterator(
- queryText: $"SELECT VALUE COUNT(1) From c {queryWhere}",
- requestOptions: new QueryRequestOptions() { PartitionKey =!string.IsNullOrWhiteSpace(partitionkey) ? new PartitionKey(partitionkey) : null, MaxItemCount = -1 });
- while (items.HasMoreResults)
- {
- using var response = await items.ReadNextAsync();
- using var json = await JsonDocument.ParseAsync(response.Content);
- if (json.RootElement.TryGetProperty("_count", out JsonElement count) && count.GetUInt16() > 0)
- {
- foreach (var obj in json.RootElement.GetProperty("Documents").EnumerateArray())
- {
- totalCount = obj.GetInt32();
- }
- }
- }
- return totalCount;
- }
- public static async Task<ResponseMessage[]> DeleteItemsStreamAsync(this Container container, List<string> ids, string partitionkey)
- {
- List<Task<ResponseMessage>> responses = new List<Task<ResponseMessage>>();
- if (ids!= null && ids.Count > 0)
- {
- foreach (var id in ids)
- {
- try
- {
- responses.Add(container.DeleteItemStreamAsync(id, new PartitionKey(partitionkey)));
- }
- catch
- {
- continue;
- }
- }
- }
- var response = await Task.WhenAll(responses);
- return response;
- }
- public static async Task<ItemResponse<T>[]> DeleteItemsAsync<T>(this Container container, List<string> ids, string partitionkey)
- {
- List<Task<ItemResponse<T>>> responses = new List<Task<ItemResponse<T>>>(); ;
- if (ids!= null && ids.Count > 0)
- {
- foreach (var id in ids)
- {
- try
- {
- responses.Add(container.DeleteItemAsync<T>(id, new PartitionKey(partitionkey)));
- }
- catch
- {
- continue;
- }
- }
- }
- var response = await Task.WhenAll(responses);
- return response;
- }
- public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorSql(this Container container, string queryText = null, QueryRequestOptions requestOptions = null, string continuationToken = null)
- {
- var items = container.GetItemQueryStreamIterator(
- queryText: queryText, continuationToken: continuationToken,
- requestOptions: requestOptions);
- while (items.HasMoreResults)
- {
- ResponseMessage response = await items.ReadNextAsync();
- if (response.IsSuccessStatusCode)
- {
- yield return response;
- }
- else
- {
- break;
- }
- }
- }
- public static async IAsyncEnumerable<ResponseMessage> GetItemQueryStreamIteratorQuery(this Container container, QueryDefinition queryDefinition,QueryRequestOptions requestOptions = null, string continuationToken = null)
- {
- var items = container.GetItemQueryStreamIterator(
- queryDefinition: queryDefinition, continuationToken: continuationToken,
- requestOptions: requestOptions);
- while (items.HasMoreResults)
- {
- ResponseMessage response = await items.ReadNextAsync();
- if (response.IsSuccessStatusCode)
- {
- yield return response;
- }
- else
- {
- break;
- }
- }
- }
- public static async IAsyncEnumerable<T> GetItemQueryIteratorQuery<T>(this Container container, QueryDefinition queryDefinition, QueryRequestOptions requestOptions = null, string continuationToken = null)
- {
- var items = container.GetItemQueryIterator<T>(
- queryDefinition: queryDefinition, continuationToken: continuationToken,
- requestOptions: requestOptions);
- while (items.HasMoreResults)
- {
- FeedResponse<T> response = await items.ReadNextAsync();
- if (response.Any())
- {
- foreach (var rs in response)
- {
- yield return rs;
- }
- }
- else
- {
- yield break;
- }
- }
- }
- public static async IAsyncEnumerable<T> GetItemQueryIteratorSql<T>(this Container container, string queryText, QueryRequestOptions requestOptions = null, string continuationToken = null)
- {
- var items = container.GetItemQueryIterator<T>(
- queryText: queryText, continuationToken: continuationToken,
- requestOptions: requestOptions);
- while (items.HasMoreResults)
- {
- FeedResponse<T> response = await items.ReadNextAsync();
- if (response.Any())
- {
- foreach (var rs in response)
- {
- yield return rs;
- }
- }
- else
- {
- yield break;
- }
- }
- }
- }
- public class CosmosDBResult<T>
- {
- public List<T>? list { get; set; } = new List<T>();
- public string? continuationToken { get; set; }
- //RU数据库查询开销
- public double ru { get; set; }
- }
- }
|