elastic 搜索
大约 9 分钟
安装
go get github.com/elastic/go-elasticsearch/v8@latest
初始化客户端: pkg/search
package search
import (
"context"
"github.com/elastic/go-elasticsearch/v8"
"github.com/orangbus/cmd/pkg/config"
"log"
"net/http"
"strings"
"time"
)
var (
Es *elasticsearch.TypedClient
)
/*
*
初始化es客户端
*/
func NewSearch() {
addrs := []string{}
es_hosts := config.GetString("es.host")
name := config.GetString("es.username")
password := config.GetString("es.password")
if strings.Contains(es_hosts, ",") {
split := strings.Split(es_hosts, ",")
for _, v := range split {
addrs = append(addrs, v)
}
} else {
addrs = append(addrs, es_hosts)
}
c := elasticsearch.Config{
Addresses: addrs,
Username: name,
Password: password,
Transport: &http.Transport{
MaxIdleConns: 100, // 最大空闲连接数
MaxIdleConnsPerHost: 10, // 每个主机的最大空闲连接数
IdleConnTimeout: time.Second * 3, // 空闲连接超时时间
},
}
client, err := elasticsearch.NewTypedClient(c)
if err != nil {
panic("elasticsearch连接失败")
}
Es = client
response, err := client.Info().Do(context.Background())
if err != nil {
log.Println(err.Error())
panic(err.Error())
}
log.Printf("elasticsearch version:%s\n", response.Version.Int)
}
映射
查询
GET /movies/_mapping
func GetMapping(indexName string) error {
_, err := search.Es.Indices.GetMapping().Index(indexName).Do(ctx)
if err != nil {
return err
}
return nil
}
创建
PUT /movies
{
"mappings": {
"properties": {
"id": {
"type": "long"
},
"vod_name": {
"type": "text", // 可以进行分词查询
"fields": {
"keyword": {
"type": "keyword" // 可以作为一个整体查询
}
}
},
"api_id": {
"type": "integer"
},
"soure": {
"type": "float"
},
"created_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"updated_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
mapping := map[string]interface{}{
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"id": map[string]interface{}{
"type": "long",
},
},
},
}
err := elastic_service.CreateMapping("demo", mapping)
func CreateMapping(indexName string, mapping map[string]interface{}) error {
marshal, err := json.Marshal(mapping)
if err != nil {
return err
}
reader := bytes.NewReader(marshal)
response, err := search.Es.Indices.Create(indexName).Raw(reader).Do(ctx)
if err != nil {
return err
}
if !response.Acknowledged {
return errors.New(fmt.Sprintf("%s 索引创建失败", indexName))
}
return nil
}
修改
PUT /movies/_mapping
{
"properties": {
"id": {
"type": "long"
},
"created_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"updated_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
索引
movies
索引名称
查询
GET /movies
func CreateIndex(indexName string) error {
response, err := search.Es.Indices.Create(indexName).Do(ctx)
if err != nil {
return err
}
if !response.Acknowledged {
return errors.New(fmt.Sprintf("[%s]索引创建失败", indexName))
}
return nil
}
删除
DELETE /movies
func DeleteIndex(indexName string) error {
response, err := search.Es.Indices.Delete(indexName).Do(ctx)
if err != nil {
return err
}
if !response.Acknowledged {
return errors.New(fmt.Sprintf("[%s]索引删除失败", indexName))
}
return nil
}
文档
添加
PUT /movies/_doc/1
{
"name": "orangbus",
"age": 18,
"created_at": "2024-06-12 10:12:00"
}
func Create(indexName string, id string, doc interface{}) error {
_, err := search.Es.Index(indexName).Id(id).Document(doc).Do(ctx)
if err != nil {
return err
}
return nil
}
批量添加
POST /movies/_bulk
{"index": {"_id": 2}}
{"name": "李四", "age": 18, "created_at": "2024-06-12 10:12:00"}
{"index": {"_id": 3}}
{"name": "王五", "age": 20, "created_at": "2024-06-12 10:12:00"}
查询
GET /demo/_doc/1
// 将查询后的结果转化为需要的结构体即可
func FindOne(indexName string, id string) ([]byte, error) {
response, err := search.Es.Get(indexName, id).Do(ctx)
if err != nil {
return nil, err
}
if !response.Found {
return nil, errors.New("记录不存在")
}
marshalJSON, err := response.Source_.MarshalJSON()
if err != nil {
return nil, err
}
return marshalJSON, nil
}
更新
PUT
全量修改
POST
个别字段修改
POST /movies/_doc/1
{
"name": "张三",
"age": 20,
"created_at": "2024-06-12 10:12:00"
}
func Update(indexName string, id string, doc interface{}) error {
_, err := search.Es.Index(indexName).Id(id).Document(doc).Do(ctx)
if err != nil {
return err
}
return nil
}
删出
DELETE /movies/_doc/1
func Delete(indexName string, id string) error {
_, err := search.Es.Delete(indexName, id).Do(ctx)
if err != nil {
return err
}
return nil
}
批量操作
添加
POST /video/_bulk
{"create": {"_index": "video", "_id": "1"}}
{"name": "orangbus"}
{"index": {"_index": "video", "_id": "2"}}
{"name": "orangbus"}
添加、更新
POST /video/_bulk
{"index": {"_index": "video", "_id": "1"}}
{"name": "orangbus2"}
{"index": {"_index": "video", "_id": "2"}}
{"name": "orangbus2"}
删除
POST /video/_bulk
{"delete": {"_index": "video", "_id": "1"}}
{"delete": {"_index": "video", "_id": "2"}}
func Bulk(data []interface{}){
var buf bytes.Buffer
for _, doc := range data {
meta := []byte(fmt.Sprintf(`{"index":{ "_index" : "%s", "_id" : "%d" }}%s`, models.TableNameMovies, doc.ID, "\n"))
str, err := json.Marshal(doc)
if err != nil {
continue
}
str = append(str, "\n"...)
buf.Grow(len(meta) + len(data))
buf.Write(meta)
buf.Write(str)
}
if err := elastic_service.Bulk(models.TableNameMovies, buf.Bytes()); err != nil {
log.Printf("【elastic-bulk】添加失败:%s", err.Error())
continue
}
log.Printf("【elastic-bulk】同步成功:%d 条", len(data))
}
func Bulk(indexName string, data []byte) error {
reader := bytes.NewReader(data)
_, err := search.Es.Bulk().Index(indexName).Raw(reader).Do(ctx)
return err
}
搜索
multi_match 多字段查询
GET /movies/_search
{
"from": 0,
"size": 10,
"query": {
"multi_match": {
"query": "美女喜欢帅哥",
"fields": [
"vod_name", "vod_tag", "type_name"
]
}
}
}
分页查询
GET /movies/_search
{
"from": 0,
"size": 100,
"query": {
"match": {
"vod_name": "动漫"
}
}
}
func TestSearch(t *testing.T) {
keywprds := "美女"
page := 0
limit := 10
req := &search.Request{
Query: &types.Query{
Match: map[string]types.MatchQuery{
"vod_name": {Query: keywprds},
},
},
From: &page,
Size: &limit,
}
res, _ := es.Search().Index("movies").Request(req).Do(context.Background())
for _, item := range res.Hits.Hits {
var data interface{}
json.Unmarshal(item.Source_, &data)
t.Log(data)
}
}
json查询-通用
func TestSearch(t *testing.T) {
keywprds := "美女"
page := 0
limit := 10
where := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"vod_name": keywprds,
},
},
"from": page,
"size": limit,
"sort": map[string]interface{}{
"id": "desc",
},
}
marshal, _ := json.Marshal(where)
t.Log(string(marshal))
reader := bytes.NewReader(marshal)
res, _ := es.Search().Index("movies").Raw(reader).Do(context.Background())
for _, item := range res.Hits.Hits {
var data interface{}
json.Unmarshal(item.Source_, &data)
t.Log(data)
}
}
高亮查询
GET /movies/_search
{
"from": 0,
"size": 10,
"query": {
"match": {
"vod_name": "美女"
}
},
"highlight": {
"fields": {
"vod_name": {
"pre_tags": [
"<span style='color:red'>"
],
"post_tags": [
"</span>"
]
},
"VodContent": {
"pre_tags": [
"<span style='color:red'>"
],
"post_tags": [
"</span>"
]
}
}
}
}
条件查询
GET /movies/_search
{
"query": {
"bool": {
"must": [
{"term": {"api_id": 2} },
{"match": {
"vod_name": "美女"
}}
]
}
}
}
范围查询
api_id
api_id 大于等于0,小于20
GET /movies/_search
{
"from": 0,
"size": 30,
"query": {
"bool": {
"must": [
{
"match": {
"vod_name": "美女"
}
}
],
"filter": [
{
"range": {
"api_id": {
"gte": 0,
"lte": 20
}
}
}
]
}
}
}
范围关键字查询
GET /movies/_search
{
"size": 100,
"query": {
"bool": {
"must": [
{
"match": {
"vod_name": "美女"
}
},
{
"terms": {
"api_id": [1,2,3]
}
}
]
}
},
"_source":[
"api_id",
"vod_name"
]
}
喜好推荐
根据一组标签推荐相似的数据
GET /movies/_search
{
"from": 0,
"size": 20,
"query": {
"more_like_this": {
"fields": ["vod_name"],
"like": ["美女","帅哥"],
"min_term_freq": 1,
"max_query_terms": 12
}
}
}
query := map[string]interface{}{
"query": map[string]interface{}{
"more_like_this": map[string]interface{}{
"fields": []string{
"vod_name",
},
"like": tags,
"min_term_freq": 1,
"max_query_terms": 12,
},
},
}
list, _, _ := elastic_service.Search("movies", query, 1, 20)
func Search(indexName string, query map[string]interface{}, page, limit int) ([]interface{}, int64, error) {
var list []interface{}
var total int64
if page <= 1 {
page = 1
}
if limit <= 0 || limit > 100 {
limit = 20
}
query["from"] = (page - 1) * limit
query["size"] = limit
marshal, err := json.Marshal(query)
if err != nil {
return list, total, err
}
reader := bytes.NewReader(marshal)
res, err2 := search.Es.Search().Index(indexName).Raw(reader).Do(ctx)
if err2 != nil {
return list, total, err
}
for _, item := range res.Hits.Hits {
var data interface{}
err := json.Unmarshal(item.Source_, &data)
if err != nil {
continue
}
list = append(list, data)
}
total = res.Hits.Total.Value
return list, total, nil
}
统计查询
根据时间查询过去14天每天文章的数量
GET /movies/_search
{
"size": 0,
"query": {
"range": {
"updated_at": {
"gte": "now-14d/d",
"lte": "now/d",
"format": "epoch_millis"
}
}
},
"aggs": {
"created_at_count": {
"date_histogram": {
"field": "created_at",
"calendar_interval": "1d", // 每天一个桶
"min_doc_count": 0, // 即使没有数据的天数也要显示
"extended_bounds": {
"min": "now-14d/d",
"max": "now/d"
}
}
}
}
}
wherein 查询
GET /movies/_search
{
"size": 100,
"query": {
"bool": {
"must": [
{
"match": {
"vod_name": "美女 帅哥"
}
},
{
"terms": {
"api_id": [1,2,3]
}
}
]
}
},
"_source":[
"api_id",
"vod_name"
]
}
时间范围查询
GET /movies/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"vod_name": "美女"
}
}
],
"filter": [
{
"range": {
// 方式1
# "created_at": {
# "gte": "now-3d/d", // 过去14天
# "lte": "now/d", // 当前时间
# "format": "yyyy-MM-dd",
# "time_zone": "Asia/Shanghai"
# },
// 方式1
"created_at": {
"gte": "2024-06-14",
"lte": "2024-06-14",
"format": "yyyy-MM-dd",
"time_zone": "Asia/Shanghai"
}
}
}
]
}
}
}
聚合
文章分类统计文章数
GET /movies/_search
{
"size": 0,
"aggs": {
"type_name_count": { // 自定义分组名称
"terms": { // 相等
"field": "type_name.keyword"
}
},
"api_count": { // 自定义分组名称
"terms": { // 相等
"field": "api_id"
}
}
}
}
其他查询
分词查询
POST /movies/_analyze
{
"analyzer": "ik_max_word",
"text": ["orangbus使用golang的姿势太帅了","世界和平"]
}
func Analyzer(text string, analyzer ...string) ([]string, error) {
analyzerType := "ik_max_word" // ik_smart
if len(analyzer) > 0 {
analyzerType = analyzer[0]
}
var tokens []string
response, err := search.Es.Indices.Analyze().Index("movies").Analyzer(analyzerType).Text(text).Do(ctx)
if err != nil {
return tokens, err
}
for _, item := range response.Tokens {
if len(item.Token) > 3 { // 3:表示一个汉字
tokens = append(tokens, item.Token)
}
}
return tokens, nil
}
标签查询
es中保存的格式: tags
类型是:nested
[
{
"name": "狂飙"
},
{
"name": "人生"
}
]
查询语句: 将标签中包含 狂飙
的数据查询出来
GET /movies/_search
{
"query": {
"nested": {
"path": "tags",
"query": {
"bool": {
"must": [
{
"match": {
"tags.name": {
"query": "狂飙"
}
}
}
]
}
}
}
}
}
golang实现
mysql的tags:
Tags []byte `gorm:"column:tags;null;type:json;comment:分词" json:"tags"`
es需要的格式
type Tag struct {
Name string `json:"name"`
}
Tags []Tag `json:"tags"`
需要手动转化一下
func MovieToEsData(v Movies) ElasticSearchMqData {
var d ElasticSearchMqData
d.ID = v.ID
d.CreatedAt = v.CreatedAt.String()
d.UpdatedAt = v.UpdatedAt.String()
var tagsArray []string
var tags []Tag
err := json.Unmarshal(v.Tags, &tagsArray)
if err == nil {
for _, tag := range tagsArray {
tags = append(tags, Tag{
Name: tag,
})
}
d.Tags = tags
}
return d
}
说明
put: 全量修改
post:个别字段修改
分词器:
- english 英语单词分词
- standard 空格分词
- ik_max_word
- ik_smart
keywords : 不会被分词
elastic_service
package elastic_service
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/orangbus/cmd/pkg/search"
)
var ctx = context.Background()
func GetMapping(indexName string) error {
_, err := search.Es.Indices.GetMapping().Index(indexName).Do(ctx)
if err != nil {
return err
}
return nil
}
func CreateMapping(indexName string, mapping map[string]interface{}) error {
marshal, err := json.Marshal(mapping)
if err != nil {
return err
}
reader := bytes.NewReader(marshal)
response, err := search.Es.Indices.Create(indexName).Raw(reader).Do(ctx)
if err != nil {
return err
}
if !response.Acknowledged {
return errors.New(fmt.Sprintf("%s 索引创建失败", indexName))
}
return nil
}
func CreateIndex(indexName string) error {
response, err := search.Es.Indices.Create(indexName).Do(ctx)
if err != nil {
return err
}
if !response.Acknowledged {
return errors.New(fmt.Sprintf("[%s]索引创建失败", indexName))
}
return nil
}
func DeleteIndex(indexName string) error {
response, err := search.Es.Indices.Delete(indexName).Do(ctx)
if err != nil {
return err
}
if !response.Acknowledged {
return errors.New(fmt.Sprintf("[%s]索引删除失败", indexName))
}
return nil
}
func Create(indexName string, id string, doc interface{}) error {
_, err := search.Es.Index(indexName).Id(id).Document(doc).Do(ctx)
if err != nil {
return err
}
return nil
}
func Update(indexName string, id string, doc interface{}) error {
_, err := search.Es.Index(indexName).Id(id).Document(doc).Do(ctx)
if err != nil {
return err
}
return nil
}
func Delete(indexName string, id string) error {
_, err := search.Es.Delete(indexName, id).Do(ctx)
if err != nil {
return err
}
return nil
}
func Search(indexName string, query map[string]interface{}, page, limit int) ([]interface{}, int64, error) {
var list []interface{}
var total int64
if page <= 1 {
page = 1
}
if limit <= 0 || limit > 100 {
limit = 20
}
query["from"] = (page - 1) * limit
query["size"] = limit
marshal, err := json.Marshal(query)
if err != nil {
return list, total, err
}
reader := bytes.NewReader(marshal)
res, err2 := search.Es.Search().Index(indexName).Raw(reader).Do(ctx)
if err2 != nil {
return list, total, err
}
for _, item := range res.Hits.Hits {
var data interface{}
err := json.Unmarshal(item.Source_, &data)
if err != nil {
continue
}
list = append(list, data)
}
total = res.Hits.Total.Value
return list, total, nil
}
func DeleteByApiId(apiId int64) error {
query := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"api_id": apiId,
},
},
}
marshal, err := json.Marshal(query)
if err != nil {
return err
}
reader := bytes.NewReader(marshal)
_, err2 := search.Es.DeleteByQuery("movies").Raw(reader).Do(ctx)
if err2 != nil {
return err2
}
return nil
}
func Analyzer(text string, analyzer ...string) ([]string, error) {
analyzerType := "ik_max_word" // ik_smart
if len(analyzer) > 0 {
analyzerType = analyzer[0]
}
var tokens []string
response, err := search.Es.Indices.Analyze().Index("movies").Analyzer(analyzerType).Text(text).Do(ctx)
if err != nil {
return tokens, err
}
for _, item := range response.Tokens {
if len(item.Token) > 3 { // 3:表示一个汉字
tokens = append(tokens, item.Token)
}
}
return tokens, nil
}
es常见的字段类型
text
- 用于全文搜索。text
类型字段会被分析(analyzed),这意味着它们会被分词器处理,适合用于搜索操作。keyword
- 用于不进行全文搜索的字段,例如标签或状态代码。keyword
类型字段不会被分析,它们存储为一个精确的值,适合用于聚合或过滤。object
- 用于存储JSON对象。object
类型允许你将JSON对象映射到Elasticsearch的内部结构,并且可以包含嵌套字段。nested
- 用于存储JSON数组或对象数组。nested
类型允许你索引JSON数组,并且可以在数组中的每个对象上执行查询。join
- 用于父子关系的数据模型。join
类型可以用于实现父子文档的关联查询。scaled_float
- 用于需要按比例缩放的浮点数,例如货币或温度。date
- 用于日期和时间。boolean
- 用于布尔值。integer
,long
,short
,byte
,double
,float
- 用于不同的数值类型。binary
- 用于二进制数据。ip
- 用于IPv4或IPv6地址。completion
- 用于自动完成功能。rank_feature
- 用于排名特征。search_as_you_type
- 用于搜索时的自动完成和纠错。constant_keyword
- 用于不进行分析的常量关键词。histogram
- 用于存储数值范围,用于直方图聚合。date_nanos
- 用于存储日期和时间,提供纳秒精度。integer_range
,float_range
,long_range
,double_range
,ip_range
- 用于存储数值范围。alias
- 用于创建字段别名。flattened
- 用于将嵌套的JSON对象扁平化为单个字段。dense_vector
和sparse_vector
- 用于存储向量,用于机器学习。percolator
- 用于存储查询,用于反向索引。point_in_time
- 用于保存时间点的快照。search
- 用于存储搜索查询。text_search
- 用于存储搜索查询。