.NET Core使用ElasticSearch 三:常用帮助类封装
电脑版发表于:2021/7/14 17:18
把.NET CORE操作ElasticSearch的常用方法封装成帮助类,包含查询的封装,操作索引相关的封装,删除,更新等等等。基于ES7.12.0,以前早一点版本的写法有些用法都不一样了,重新封装一下。
写了我好久
namespace ElasticsearchLearn.ElasticSearch { public static class ElasticsearchExtands { /// <summary> /// 封装后的linq的查询方式 /// </summary> /// <typeparam name="T">实体泛型</typeparam> /// <param name="indexName">index的名称</param> /// <param name="selector">linq内容</param> /// <returns></returns> public static async Task<List<T>> SearchByLinqAsync<T>(this IElasticSearchServer elasticSearchServer, string indexName, Func<QueryContainerDescriptor<T>, QueryContainer> selector = null) where T : class { var list = await elasticSearchServer.ElasticLinqClient.SearchAsync<T>(option => option.Index(indexName.ToLower()).Query(selector)); return list.Documents.ToList(); } /// <summary> /// 封装后的Json的查询方式 /// </summary> /// <param name="indexName">index的名称</param> /// <param name="jsonString">json字符串</param> /// <returns>返回Jobject的内容</returns> public static async Task<JToken> SearchByJsonAsync(this IElasticSearchServer elasticSearchServer, string indexName, string jsonString) { var stringRespones = await elasticSearchServer.ElasticJsonClient.SearchAsync<StringResponse>(indexName.ToLower(), jsonString); var jobject = JObject.Parse(stringRespones.Body); var total = Convert.ToInt32(jobject["hits"]["total"]["value"].ToString()); if (total > 0) { string json = string.Empty; var sourceArg = jobject["hits"]["hits"]; foreach (var source in sourceArg) { string sourceJson = source["_source"].ToString().Substring(1, source["_source"].ToString().Length - 1); sourceJson = "{ \"_id\":\"" + source["_id"] + "\"," + sourceJson; if (json.Length <= 0) json += sourceJson; else json += "," + sourceJson; } return JToken.Parse("[" + json + "]"); } return null; } /// <summary> /// 通过索引与id检查文档是否已经存在 /// </summary> /// <param name="index">索引</param> /// <param name="id">id</param> /// <returns></returns> public static async Task<bool> SourceExistsAsync(this IElasticSearchServer elasticSearchServer, string index, string id) { bool flag = false; StringResponse resStr = null; try { //elasticSearchServer.ElasticJsonClient.Indices.Exists() resStr = await elasticSearchServer.ElasticJsonClient.SourceExistsAsync<StringResponse>(index, id); if (resStr.HttpStatusCode == 200) { flag = true; } } catch (Exception ex) { } return flag; } /// <summary> /// 检测索引是否已经存在 /// </summary> /// <param name="index"></param> /// <returns></returns> public static async Task<bool> IsIndexExsit(this IElasticSearchServer elasticSearchServer, string index) { bool flag = false; StringResponse resStr = null; try { resStr = await elasticSearchServer.ElasticJsonClient.Indices.ExistsAsync<StringResponse>(index); if (resStr.HttpStatusCode == 200) { flag = true; } } catch (Exception ex) { } return flag; } /// <summary> /// 创建index /// </summary> /// <param name="indexName"></param> /// <param name="shards">分片数量,即数据块最小单元</param> /// <returns></returns> public static async Task<bool> CreateIndexAsync(this IElasticSearchServer elasticSearchServer, string indexName, int shards = 5) { var isHaveIndex = await IsIndexExsit(elasticSearchServer, indexName.ToLower()); if (!isHaveIndex) { var stringResponse = await elasticSearchServer.ElasticJsonClient.Indices.CreateAsync<StringResponse>(indexName.ToLower(), PostData.String($"{{\"settings\" : {{\"index\" : {{\"number_of_replicas\" : 0, \"number_of_shards\":\"{shards}\",\"refresh_interval\":\"-1\"}}}}}}")); var resObj = JObject.Parse(stringResponse.Body); if ((bool)resObj["acknowledged"]) { return true; } } else { return true; } return false; } /// <summary> /// 删除index /// </summary> /// <param name="indexName"></param> /// <returns></returns> public static async Task<bool> DeleteIndexAsync(this IElasticSearchServer elasticSearchServer, string indexName) { var stringRespones = await elasticSearchServer.ElasticJsonClient.Indices.DeleteAsync<StringResponse>(indexName.ToLower()); var resObj = JObject.Parse(stringRespones.Body); if ((bool)resObj["acknowledged"]) { return true; } return false; } /// <summary> /// 插入单个文档 /// </summary> /// <param name="indexName">索引名称</param> /// <param name="objectDocment">文档内容</param> /// <param name="_id">自定义_id</param> /// <returns></returns> public static async Task<bool> InsertDocumentAsync(this IElasticSearchServer elasticSearchServer, string indexName, object objectDocment, string _id = "") { var stringRespones = new StringResponse(); if (_id.Length > 0) stringRespones = await elasticSearchServer.ElasticJsonClient.IndexAsync<StringResponse>(indexName.ToLower(), _id, PostData.String(JsonConvert.SerializeObject(objectDocment))); else stringRespones = await elasticSearchServer.ElasticJsonClient.IndexAsync<StringResponse>(indexName.ToLower(), PostData.String(JsonConvert.SerializeObject(objectDocment))); var resObj = JObject.Parse(stringRespones.Body); if ((int)resObj["_shards"]["successful"] > 0) { return true; } return false; } /// <summary> /// 删除单个文档 /// </summary> /// <param name="indexName">索引名称</param> /// <param name="_id">要删除的id</param> /// <returns></returns> public static async Task<bool> DeleteDocumentAsync(this IElasticSearchServer elasticSearchServer, string indexName, string _id) { bool flag = false; StringResponse resStr = null; try { resStr = await elasticSearchServer.ElasticJsonClient.DeleteAsync<StringResponse>(indexName.ToLower(), _id); var resObj = JObject.Parse(resStr.Body); if ((int)resObj["_shards"]["total"] == 0 || (int)resObj["_shards"]["successful"] > 0) { flag = true; } } catch (Exception ex) { } return flag; } /// <summary> /// 更新文档 /// </summary> /// <param name="indexName">索引名称</param> /// <param name="_id">文档id</param> /// <param name="objectDocment">文档内容</param> /// <returns></returns> public static async Task<bool> UpdateDocumentAsync(this IElasticSearchServer elasticSearchServer, string indexName, string id, object objectDocment) { bool flag = false; try { string json = JsonConvert.SerializeObject(objectDocment); var updateToJson = "{\"doc\":" + json + "}"; var stringRespones = await elasticSearchServer.ElasticJsonClient.UpdateAsync<StringResponse>(indexName, id, PostData.String(updateToJson)); var resObj = JObject.Parse(stringRespones.Body); if ((int)resObj["_shards"]["successful"] > 0) { return true; } } catch { } return flag; } /// <summary> /// 通过Bulk更新文档 /// </summary> /// <param name="indexName">索引名称</param> /// <param name="_id">文档id</param> /// <param name="objectDocment">文档内容</param> /// <returns></returns> public static async Task<bool> UpdateDocumentByBulkAsync(this IElasticSearchServer elasticSearchServer, string indexName, string _id, object objectDocment) { bool flag = false; try { string json = JsonConvert.SerializeObject(objectDocment); if (json.IndexOf("[") == 0) { var objectDocmentOne = JToken.Parse(json); json = JsonConvert.SerializeObject(objectDocmentOne[0]); } int idInt = json.IndexOf("\"_id"); if (idInt > 0) { string idJson = json.Substring(idInt, json.IndexOf(_id) + _id.Length + 1); json = json.Replace(idJson, ""); } List<string> list = new List<string>(); list.Add("{\"update\":{\"_id\":\"" + _id + "\"}}"); list.Add("{\"doc\":" + json + "}"); var stringRespones = await elasticSearchServer.ElasticJsonClient.BulkAsync<StringResponse>(indexName.ToLower(), PostData.MultiJson(list)); var resObj = JObject.Parse(stringRespones.Body); if (!(bool)resObj["errors"]) { return true; } } catch { } return flag; } } }
在控制器中测试的代码如下:
namespace ElasticsearchLearn.Controllers { public class HomeController : Controller { private readonly ILogger<HomeController> _logger; private readonly IElasticSearchServer _elasticSearchServer; public HomeController(ILogger<HomeController> logger, IElasticSearchServer elasticSearchServer) { _logger = logger; _elasticSearchServer = elasticSearchServer; } public IActionResult Index() { return View(); } /// <summary> /// 基于Linq的查询 /// </summary> public async Task<List<Persons>> SearchByLinq(string user = "批量") { var list = await _elasticSearchServer.SearchByLinqAsync<Persons>("users", op => op.Match( ss => ss.Field( qq => qq.user == user))); return list; } /// <summary> /// 基于Json的查询 /// </summary> public async Task<string> SearchByJson(string user = "批量") { var jsonobject = new { query = new { match = new { user = "批量" } } }; string json = JsonConvert.SerializeObject(jsonobject); var jToken = await _elasticSearchServer.SearchByJsonAsync("users", json); return jToken.ToString(); } /// <summary> /// 插入单个文档 /// </summary> /// <param name="indexName">索引名称</param> /// <param name="objectDocment">文档内容</param> /// <param name="_id">自定义_id</param> /// <returns></returns> public async Task<bool> InsertDocument() { var content = new { user = "嘻嘻", post_date = "2021-10-11T15:00:12", message = "...." }; bool result = await _elasticSearchServer.InsertDocumentAsync("users", content, "1"); return result; } /// <summary> /// 删除单个文档 /// </summary> /// <param name="index"></param> /// <param name="id"></param> /// <returns></returns> public async Task<bool> DeleteDocument(string index = "users", string id = "2") { bool result = await _elasticSearchServer.DeleteDocumentAsync(index, id); return result; } /// <summary> /// 通过Bulk更新文档 /// </summary> /// <returns></returns> public async Task<bool> UpdateDocumentByBulk() { var content = new { user = "嘻嘻2", post_date = "2021-10-11T15:00:12", message = "更新一下" }; bool result = await _elasticSearchServer.UpdateDocumentByBulkAsync("users", "1", content); return result; } /// <summary> /// 更新文档 /// </summary> /// <returns></returns> public async Task<bool> UpdateDocument() { var content = new { user = "嘻嘻3", post_date = "2021-10-11T15:00:12", message = "更新一下333" }; bool result = await _elasticSearchServer.UpdateDocumentAsync("users", "1", content); return result; } public async Task<bool> SourceExistsAsync(string index = "users", string id = "1") { bool result = await _elasticSearchServer.SourceExistsAsync(index, id); return result; } /// <summary> /// 检测索引是否已经存在 /// </summary> public async Task<bool> IsIndexExsit(string index = "users") { bool result = await _elasticSearchServer.IsIndexExsit(index); return result; } /// <summary> /// 创建index /// </summary> /// <param name="index"></param> /// <returns></returns> public async Task<bool> CreateIndex(string index) { if (string.IsNullOrWhiteSpace(index)) return false; bool result = await _elasticSearchServer.CreateIndexAsync(index); return result; } /// <summary> /// 删除index /// </summary> /// <param name="index"></param> /// <returns></returns> public async Task<bool> DeleteIndex(string index) { if (string.IsNullOrWhiteSpace(index)) return false; bool result = await _elasticSearchServer.DeleteIndexAsync(index); return result; } }
部分效果如下:
其实调用方法的时候我们可以看到执行的原生语句是什么:
贴一下测试用的原生语句:
##添加内容 PUT users/_create/2 { "user" : "xxx", "post_date" : "2021-10-11T14:40:12", "message" : "trying out Elasticsearch" } ##根据index和id查询文档 GET users/_doc/1 ##批量查询 GET /_mget { "docs" : [ { "_index" : "users", "_id" : "1" }, { "_index" : "users", "_id" : "2" }, { "_index" : "users", "_id" : "3" } ] } ##更新文档 POST users/_update/1 { "doc" : { "user" : "嘻嘻1" } } ##检查index是否存在 HEAD users
在贴一个代码下载地址:https://download.tnblog.net/resource/index/4407008a3846478c8eb0e33e48eff418