YOLO813

Django操作elasticsearch的测试

    学习elasticsearch的原因在于关系型数据库mysql的数据量超过了百万级别,再对其进行模糊搜索实在是太慢了,尽管很吃服务器资源,所以便想通过elasticsearch来解决。

    Linux云服务器,由于不能使用root用户来启动es,创建一个es用户专门来管理elasticsearch(如果中途不小心使用了root用户启动了,一定要记得再次使用chown重新赋权,可以查看下日志logs下面的文件,部分权限又变成了root)

# can not run elasticsearch as root
useradd es
chown -R es:es elasticsearch
su es
./bin/elasticsearch # 启动

    如何查看是否正确安装呢?返回如下结果,则说明运行成功,如果想要在后台运行,在运行时加入-d参数即可

curl -X GET "localhost:9200?pretty"
{
  "name" : "node-1",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "sdasdasdgdsf",
  "version" : {
    "number" : "7.10.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "1232132132135c6e8f3a7997850f96",
    "build_date" : "2020-11-09T21:30:33.964949Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}


    开始增加索引

curl -XPUT "http://localhost:9200/cba" -H 'Content-Type: application/json' -d'

 

    python生成elasticsearch所需文件的备注,因为是分先后写的,所以比较啰嗦,其实很多内容可以整合到一个脚本当中,后期再来进行。
    首先是写了一个获取数据库数据的程序,用于将表内数据提取出来,去掉多余的自增编号。

# getDataFromMysql.py
sql = "SELECT * FROM `" + table + "`"
cursor.execute(sql)
#去掉多余的自增编号
for value in cursor.fetchall():
    content = value[1:]
    with open(ouput_name, "a", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(content)


    第二步,利用linux的split命令将上面输出的ouput_name文件进行分割,这里要讲下原因,因为在我的实测过程当中,一份70M的文件使用_bulk方法导入到elasticsearch中时,很有问题,在1核2G服务器上会直接让elasticsearch宕机,而在4核8G的windows电脑上,也无法导入成功,而我的最终文件比这个大得多,所以将其按照25000行一份文件进行了切割,最终生成了123份文件(如果是从windows系统传上来的文件,最好做下格式转换,否则有可能文件切割没有报错,但是切割行数却出了问题)

# 建议在使用linux处理文件时,最好做下格式转换
dos2unix inputName.csv
split -l 25000 inputName.csv outName


    第三步,为每份文件增加标题栏字段,方便生成json格式的文件,文件这么多,当然不可能每一份为它去手动添加,否则后续如果有什么问题,再来一遍,那得崩溃了

# insertTitle.py
from pathlib import Path
import os
BASE_DIR = str(Path(__file__).resolve(strict=True).parent) + "/folder"
files = os.listdir(BASE_DIR)
for filename in files:
    with open("./folder/" + filename, 'r+', encoding='utf-8') as f:
        content = f.read()
        f.seek(0, 0)
        f.write(strs + "\n" + content)

    在这一步,我将获取folder文件夹下面所有切割好的文件(请注意,是否每个文件都没有标题栏!),并利用seek将文件指针指向0,就可以在文件头部增加我想要的标题栏,为什么需要增加f.read()的内容呢?在实际测试当中,假如仅仅是f.write(strs),会发现我们的标题确实插入到了头部,但是第二行会丢失很多内容,具体的原因暂时没去了解。


    第四步,将所有添加好标题栏的文件进行转化,生成目标json格式文件

# csv2json.py
for filename in files:
    with open("./folder/" + filename, "r",encoding="utf-8") as f:
        reader = csv.reader(f)
        for i,line in enumerate(reader):
            dict_content= {}
            if i == 0:
                title = line
            else:
                dict_id = {'index': {'_index':index_name,'_id':line[3]}}
                dict_id_json = json.dumps(dict_id)
                for i in range(len(title)):
                    if not line[5]:
                        line[5] = None
                    dic = {title[i]: line[i]}
                    dict_content.update(dic)
                dict_content_json = json.dumps(dict_content, ensure_ascii=False)
                with open("./outJson/" + filename + ".json", "a", encoding="utf-8") as fp:
                    fp.write(dict_id_json+ "\n" +dict_content_json + "\n")

首先,我们需要知道,elasticsearch的批量导入方法如下:

curl -X POST "localhost:9200/_bulk?pretty" -H "Content-Type: application/json" --data-binary @output.json

关于curl(客户端client的 URL 工具)命令行工具的用法就不写了,只要知道可以在--data-binary跟上要上传至ES中的文件就可以了,output.json中的文件格式应该如下所示,当然,些许区别是没有关系的,例如不增加_type,毕竟elasticsearch7.x中_type只有一种,又或者删除_index也是可行的,直接利用localhost:9200/nba/_bulk来进行导入

{"index":{"_index":"nba","_type":"_doc","_id":"1"}}
{"countryEn":"United States","teamName":"老鹰","birthDay":831182400000,"country":"美国","teamCityEn":"Atlanta","code":"jaylen_adams","displayAffiliation":"United States","displayName":"杰伦 亚当斯","schoolType":"College","teamConference":"东部","teamConferenceEn":"Eastern","weight":"86.2 公斤","teamCity":"亚特兰大","playYear":1,"jerseyNo":"10","teamNameEn":"Hawks","draft":2018,"displayNameEn":"Jaylen Adams","heightValue":1.88,"birthDayStr":"1996-05-04","position":"后卫","age":23,"playerId":"1629121"}

    上面的程序中,我利用dumps函数将文件每一行都生成了两行json数据,并配置ensure_ascii=False取消输出中文ascii 字符码;并将不存在的日期输出为None,这个None在文件中会显示成null,这个主要是因为ES当中日期格式除非有值,否则只接受null。


    第五,写一个linux脚本批量导入数据bulkData2ES.sh。

#!/bin/bash
#20210406
#批量导入json数据至ES
folder=/root/MyScripts/outJson
files=$(ls $folder)
cd /root/MyScripts/outJson
for file in $files
do
        echo $file
        curl -X POST "localhost:9200/_bulk?pretty" -H "Content-Type: application/json" --data-binary @$file
done
echo "that's ok!"

 

    数据都导入进去后,如何在django中应用呢?我现在应用的方法比较粗糙,应该还有更好的方法,后续再来完善。
    第一步,在django的虚拟环境中安装相关库以操作ES:

pip install elasticsearch
pip install elasticsearch_dsl

    第二步,建立一个名为ES的python包,新建es.py文件,开始进行模糊双条件查询,让其返回20条结果,代码如下:

#es.py
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search,Q
client = Elasticsearch()
my_client = Search(using=client, index="nba")
def SearchOnES(params):
    q = Q("multi_match", query=params, fields=['name', 'chinese'])
    my_search= my_client[0:20].query(q)
    # resp = my_search.execute(ignore_cache=True)
    return my_search

    第三步,在需要查询的app下面的views中导入相关函数

#views.py
from ES import es
# 我用的是listview,将默认的get_queryset函数可以精简下
def get_queryset(self):
  return super().get_queryset().filter(pk__lte=1)
#在get_context_data函数中调用es查询,不走数据库
def get_context_data(self, **kwargs):
    context = super().get_context_data(**kwargs)
    res = self.getPaginationInfo(context['paginator'], context['page_obj'])
    context.update(res)
    if self.request.GET:
        context['nba'] = es.SearchOnES(self.request.GET.get('q'))
    return context


    第四步,假如需要更换索引下面的某个字段分词器,或者修改字段类型怎么办呢?ES不支持直接修改,只能更换索引(利用别名)!步骤如下:

先建立原索引的别名,将上面第二步中(django)的index更换成索引别名

#建立别名
POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "nba",
        "alias": "nbaLatest"
      }
    }
  ]
}

再次新建一个索引,可以使用_reindex将旧索引数据复制到新索引

#我是采取数据重新导入的方法
nba20210407
#可以通过如下代码查看文档数量
curl -X GET "http://localhost:9200/_cat/count/nba20210407?v&pretty"

利用ES的两个索引可以使用一个别名的特性来无缝衔接索引的切换:

POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "nba20210407",
        "alias": "nbaLatest"
      }
    },
    {
      "remove": {
        "index": "nba",
        "alias": "nbaLatest"
      }
    }
  ]
}

删除旧索引

DELETE /nba


    第五步,如何更新数据?一成不变的数据是没有任何意义的,那么如何更新文档的部分内容呢?

#通过update方法,注意键值为doc!
POST nba/_update/737
{
  "doc": {
      "name":"hello world"
  }
}

最好不要通过put方法去更新,以下的方法会将该条文档所有字段都抹除,只剩下name和xy两个字段内容,所以如果用以下方法来更新的话,切记必须全量更新文档字段

PUT nba/_doc/737
{
  "name":"hello 1111",
  "xy":"test"
}

 


参考:

#elasticsearch-dsl
https://elasticsearch-dsl.readthedocs.io/en/latest/