AzureCosmosDBRepository.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using System.Linq;
  5. using TEAMModelOS.SDK.Module.AzureCosmosDB.Configuration;
  6. using TEAMModelOS.SDK.Module.AzureCosmosDB.Interfaces;
  7. using Microsoft.Azure.Documents.Client;
  8. using Microsoft.Azure.Documents;
  9. using Microsoft.Azure.Documents.Linq;
  10. using System.Reflection;
  11. using Microsoft.Azure.CosmosDB.BulkExecutor;
  12. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkImport;
  13. using System.Threading;
  14. using Microsoft.Azure.CosmosDB.BulkExecutor.BulkUpdate;
  15. using CosmosDBTest.AzureCosmosDB;
  16. using System.Text.Json;
  17. namespace TEAMModelOS.SDK.Module.AzureCosmosDB.Implements
  18. { /// <summary>
  19. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  20. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  21. /// </summary>
  22. public class AzureCosmosDBRepository : IAzureCosmosDBRepository
  23. {
  24. /// <summary>
  25. /// sdk 文档https://github.com/Azure/azure-cosmos-dotnet-v2/tree/master/samples
  26. /// https://github.com/Azure/azure-cosmos-dotnet-v2/blob/530c8d9cf7c99df7300246da05206c57ce654233/samples/code-samples/DatabaseManagement/Program.cs#L72-L121
  27. /// </summary>
  28. private DocumentClient CosmosClient { get; set; }
  29. private DocumentCollection CosmosCollection { get; set; }
  30. private string Database { get; set; }
  31. private int CollectionThroughput { get; set; }
  32. public AzureCosmosDBRepository(AzureCosmosDBOptions options)
  33. {
  34. try
  35. {
  36. if (!string.IsNullOrEmpty(options.ConnectionString))
  37. {
  38. CosmosClient = CosmosDBClientSingleton.getInstance(options.ConnectionString, options.ConnectionKey).GetCosmosDBClient();
  39. }
  40. else
  41. {
  42. throw new Exception("请设置正确的AzureCosmosDB数据库配置信息!");
  43. }
  44. Database = options.Database;
  45. CollectionThroughput = options.CollectionThroughput;
  46. CosmosClient.CreateDatabaseIfNotExistsAsync(new Database { Id = Database });
  47. // _connectionString = options.ConnectionString;
  48. }
  49. catch (DocumentClientException de)
  50. {
  51. Exception baseException = de.GetBaseException();
  52. Console.WriteLine("{0} error occurred: {1}, Message: {2}", de.StatusCode, de.Message, baseException.Message);
  53. }
  54. catch (Exception e)
  55. {
  56. Exception baseException = e.GetBaseException();
  57. Console.WriteLine("Error: {0}, Message: {1}", e.Message, baseException.Message);
  58. }
  59. finally
  60. {
  61. Console.WriteLine("End of demo, press any key to exit.");
  62. // Console.ReadKey();
  63. }
  64. }
  65. private async Task<DocumentCollection> InitializeCollection<T>()
  66. {
  67. Type t = typeof(T);
  68. if (CosmosCollection == null || !CosmosCollection.Id.Equals(t.Name))
  69. {
  70. DocumentCollection collectionDefinition = new DocumentCollection { Id = t.Name };
  71. collectionDefinition.IndexingPolicy = new IndexingPolicy(new RangeIndex(DataType.String) { Precision = -1 });
  72. string partitionKey = GetPartitionKey<T>();
  73. // collectionDefinition.PartitionKey = new PartitionKeyDefinition { Paths = new System.Collections.ObjectModel.Collection<string>() };
  74. if (!string.IsNullOrEmpty(partitionKey))
  75. {
  76. collectionDefinition.PartitionKey.Paths.Add("/" + partitionKey);
  77. }
  78. // CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(Database), collectionDefinition);
  79. CosmosCollection = await this.CosmosClient.CreateDocumentCollectionIfNotExistsAsync(
  80. UriFactory.CreateDatabaseUri(Database), collectionDefinition, new RequestOptions { OfferThroughput = CollectionThroughput }
  81. );
  82. }
  83. return CosmosCollection;
  84. }
  85. private string GetPartitionKey<T>()
  86. {
  87. Type type = typeof(T);
  88. PropertyInfo[] properties = type.GetProperties();
  89. List<PropertyInfo> attrProperties = new List<PropertyInfo>();
  90. foreach (PropertyInfo property in properties)
  91. {
  92. if (property.Name.Equals("PartitionKey"))
  93. {
  94. attrProperties.Add(property);
  95. break;
  96. }
  97. object[] attributes = property.GetCustomAttributes(true);
  98. foreach (object attribute in attributes) //2.通过映射,找到成员属性上关联的特性类实例,
  99. {
  100. if (attribute is PartitionKeyAttribute)
  101. {
  102. attrProperties.Add(property);
  103. }
  104. }
  105. }
  106. if (attrProperties.Count <= 0)
  107. {
  108. return null;
  109. }
  110. else
  111. {
  112. if (attrProperties.Count == 1)
  113. {
  114. return attrProperties[0].Name;
  115. }
  116. else { throw new Exception("PartitionKey can only be single!"); }
  117. }
  118. }
  119. public async Task<T> Save<T>(T entity) //where T : object, new()
  120. {
  121. try {
  122. Type t = typeof(T);
  123. DocumentCollection documentCollection = await InitializeCollection<T>();
  124. ResourceResponse<Document> doc =
  125. await CosmosClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  126. //Console.WriteLine(doc.ActivityId);
  127. return entity;
  128. } catch (Exception e ) {
  129. throw new Exception(e.Message);
  130. }
  131. }
  132. public async Task<T> Update<T>(T entity)
  133. {
  134. Type t = typeof(T);
  135. await InitializeCollection<T>();
  136. ResourceResponse<Document> doc =
  137. await CosmosClient.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, t.Name), entity);
  138. return entity;
  139. }
  140. public async Task<string> ReplaceObject<T>(T entity, string key)
  141. {
  142. Type t = typeof(T);
  143. await InitializeCollection<T>();
  144. try
  145. {
  146. ResourceResponse<Document> doc =
  147. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key), entity);
  148. return key;
  149. }
  150. catch (Exception e)
  151. {
  152. Console.WriteLine("{0} Exception caught.", e);
  153. //return false;
  154. }
  155. return null;
  156. }
  157. public async Task<string> ReplaceObject<T>(T entity, string key, string partitionKey)
  158. {
  159. Type t = typeof(T);
  160. await InitializeCollection<T>();
  161. try
  162. {
  163. ResourceResponse<Document> doc =
  164. await CosmosClient.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, key),
  165. entity,
  166. new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  167. return key;
  168. }
  169. catch (Exception e)
  170. {
  171. throw new Exception(e.Message);
  172. //Console.WriteLine("{0} Exception caught.", e);
  173. //return false;
  174. }
  175. }
  176. public async Task<List<T>> FindAll<T>()
  177. {
  178. Type t = typeof(T);
  179. Boolean open = true;
  180. List<T> objs = new List<T>();
  181. //await InitializeCollection<T>();
  182. //查询条数 -1是全部
  183. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  184. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions).AsDocumentQuery();
  185. while (query.HasMoreResults)
  186. {
  187. foreach (T obj in await query.ExecuteNextAsync())
  188. {
  189. objs.Add(obj);
  190. }
  191. }
  192. return objs;
  193. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  194. }
  195. public async Task<List<T>> FindLinq<T>(Func<IQueryable<object>, object> singleOrDefault)
  196. {
  197. Type t = typeof(T);
  198. List<T> objs = new List<T>();
  199. await InitializeCollection<T>();
  200. //查询条数 -1是全部
  201. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
  202. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), queryOptions);
  203. // query.Where();
  204. return objs;
  205. //return CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name),sql);
  206. }
  207. public async Task<List<T>> FindSQL<T>(string sql)
  208. {
  209. Type t = typeof(T);
  210. List<T> objs = new List<T>();
  211. await InitializeCollection<T>();
  212. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql);
  213. foreach (var item in query)
  214. {
  215. objs.Add(item);
  216. }
  217. return objs;
  218. }
  219. public async Task<List<T>> FindSQL<T>(string sql, bool IsPk)
  220. {
  221. Type t = typeof(T);
  222. List<T> objs = new List<T>();
  223. Boolean open = IsPk;
  224. await InitializeCollection<T>();
  225. //查询条数 -1是全部
  226. FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1, EnableCrossPartitionQuery = open };
  227. var query = CosmosClient.CreateDocumentQuery<T>(UriFactory.CreateDocumentCollectionUri(Database, t.Name), sql, queryOptions);
  228. foreach (var item in query)
  229. {
  230. objs.Add(item);
  231. }
  232. return objs;
  233. }
  234. public async Task<string> DeleteAsync<T>(string id)
  235. {
  236. Type t = typeof(T);
  237. await InitializeCollection<T>();
  238. ResourceResponse<Document> doc =
  239. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id));
  240. //Console.WriteLine(doc.ActivityId);
  241. return id;
  242. }
  243. public async Task<T> DeleteAsync<T>(T entity)
  244. {
  245. await InitializeCollection<T>();
  246. Type t = typeof(T);
  247. string PartitionKey = GetPartitionKey<T>();
  248. if (!string.IsNullOrEmpty(PartitionKey))
  249. {
  250. string pkValue = entity.GetType().GetProperty(PartitionKey).GetValue(entity).ToString();
  251. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  252. ResourceResponse<Document> doc =
  253. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue), new RequestOptions { PartitionKey = new PartitionKey(pkValue) });
  254. }
  255. else
  256. {
  257. string idValue = entity.GetType().GetProperty("id").GetValue(entity).ToString();
  258. ResourceResponse<Document> doc =
  259. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, idValue));
  260. }
  261. //Console.WriteLine(doc.ActivityId);
  262. return entity;
  263. }
  264. public async Task<string> DeleteAsync<T>(string id, string partitionKey)
  265. {
  266. Type t = typeof(T);
  267. await InitializeCollection<T>();
  268. ResourceResponse<Document> doc =
  269. await CosmosClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(Database, t.Name, id), new RequestOptions { PartitionKey = new PartitionKey(partitionKey) });
  270. //Console.WriteLine(doc.ActivityId);
  271. return id;
  272. }
  273. public async Task<List<T>> SaveAll<T>(List<T> enyites)
  274. {
  275. DocumentCollection dataCollection = await InitializeCollection<T>();
  276. // Set retry options high for initialization (default values).
  277. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
  278. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
  279. IBulkExecutor bulkExecutor = new BulkExecutor(CosmosClient, dataCollection);
  280. await bulkExecutor.InitializeAsync();
  281. // Set retries to 0 to pass control to bulk executor.
  282. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
  283. CosmosClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  284. BulkImportResponse bulkImportResponse = null;
  285. long totalNumberOfDocumentsInserted = 0;
  286. double totalRequestUnitsConsumed = 0;
  287. double totalTimeTakenSec = 0;
  288. var tokenSource = new CancellationTokenSource();
  289. var token = tokenSource.Token;
  290. int pageSize = 100;
  291. int pages = (int)Math.Ceiling((double)enyites.Count / pageSize);
  292. for (int i = 0; i < pages; i++)
  293. {
  294. List<string> documentsToImportInBatch = new List<string>();
  295. List<T> lists = enyites.Skip((i) * pageSize).Take(pageSize).ToList();
  296. for (int j = 0; j < lists.Count; j++)
  297. {
  298. documentsToImportInBatch.Add(JsonSerializer.Serialize(lists[j]));
  299. }
  300. var tasks = new List<Task>
  301. { Task.Run(async () =>
  302. {
  303. do
  304. {
  305. //try
  306. //{
  307. bulkImportResponse = await bulkExecutor.BulkImportAsync(
  308. documents: documentsToImportInBatch,
  309. enableUpsert: true,
  310. disableAutomaticIdGeneration: true,
  311. maxConcurrencyPerPartitionKeyRange: null,
  312. maxInMemorySortingBatchSize: null,
  313. cancellationToken: token);
  314. //}
  315. //catch (DocumentClientException de)
  316. //{
  317. // break;
  318. //}
  319. //catch (Exception e)
  320. //{
  321. // break;
  322. //}
  323. } while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
  324. totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
  325. totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
  326. totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
  327. },
  328. token)
  329. };
  330. await Task.WhenAll(tasks);
  331. }
  332. return enyites;
  333. }
  334. private static UpdateOperation SwitchType(string key, object obj)
  335. {
  336. Type s = obj.GetType();
  337. TypeCode typeCode = Type.GetTypeCode(s);
  338. switch (typeCode)
  339. {
  340. case TypeCode.String: return new SetUpdateOperation<string>(key, obj.ToString());
  341. case TypeCode.Int32: return new SetUpdateOperation<Int32>(key, (Int32)obj);
  342. case TypeCode.Double: return new SetUpdateOperation<Double>(key, (Double)obj);
  343. case TypeCode.Byte: return new SetUpdateOperation<Byte>(key, (Byte)obj);
  344. case TypeCode.Boolean: return new SetUpdateOperation<Boolean>(key, (Boolean)obj);
  345. case TypeCode.DateTime: return new SetUpdateOperation<DateTime>(key, (DateTime)obj);
  346. case TypeCode.Int64: return new SetUpdateOperation<Int64>(key, (Int64)obj);
  347. default: return null;
  348. }
  349. }
  350. }
  351. }