学习elasticsearch的原因在于关系型数据库mysql的数据量超过了百万级别,再对其进行模糊搜索实在是太慢了,尽管很吃服务器资源,所以便想通过elasticsearch来解决。
Linux云服务器,由于不能使用root用户来启动es,创建一个es用户专门来管理elasticsearch(如果中途不小心使用了root用户启动了,一定要记得再次使用chown重新赋权,可以查看下日志logs下面的文件,部分权限又变成了root)
# can not run elasticsearch as root
useradd es
chown -R es:es elasticsearch
su es
./bin/elasticsearch # 启动
如何查看是否正确安装呢?返回如下结果,则说明运行成功,如果想要在后台运行,在运行时加入-d参数即可
curl -X GET "localhost:9200?pretty"
{
"name" : "node-1",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "sdasdasdgdsf",
"version" : {
"number" : "7.10.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "1232132132135c6e8f3a7997850f96",
"build_date" : "2020-11-09T21:30:33.964949Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
开始增加索引
curl -XPUT "http://localhost:9200/cba" -H 'Content-Type: application/json' -d'
python生成elasticsearch所需文件的备注,因为是分先后写的,所以比较啰嗦,其实很多内容可以整合到一个脚本当中,后期再来进行。
首先是写了一个获取数据库数据的程序,用于将表内数据提取出来,去掉多余的自增编号。
# getDataFromMysql.py
sql = "SELECT * FROM `" + table + "`"
cursor.execute(sql)
#去掉多余的自增编号
for value in cursor.fetchall():
content = value[1:]
with open(ouput_name, "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(content)
第二步,利用linux的split命令将上面输出的ouput_name文件进行分割,这里要讲下原因,因为在我的实测过程当中,一份70M的文件使用_bulk方法导入到elasticsearch中时,很有问题,在1核2G服务器上会直接让elasticsearch宕机,而在4核8G的windows电脑上,也无法导入成功,而我的最终文件比这个大得多,所以将其按照25000行一份文件进行了切割,最终生成了123份文件(如果是从windows系统传上来的文件,最好做下格式转换,否则有可能文件切割没有报错,但是切割行数却出了问题)
# 建议在使用linux处理文件时,最好做下格式转换
dos2unix inputName.csv
split -l 25000 inputName.csv outName
第三步,为每份文件增加标题栏字段,方便生成json格式的文件,文件这么多,当然不可能每一份为它去手动添加,否则后续如果有什么问题,再来一遍,那得崩溃了
# insertTitle.py
from pathlib import Path
import os
BASE_DIR = str(Path(__file__).resolve(strict=True).parent) + "/folder"
files = os.listdir(BASE_DIR)
for filename in files:
with open("./folder/" + filename, 'r+', encoding='utf-8') as f:
content = f.read()
f.seek(0, 0)
f.write(strs + "\n" + content)
在这一步,我将获取folder文件夹下面所有切割好的文件(请注意,是否每个文件都没有标题栏!),并利用seek将文件指针指向0,就可以在文件头部增加我想要的标题栏,为什么需要增加f.read()的内容呢?在实际测试当中,假如仅仅是f.write(strs),会发现我们的标题确实插入到了头部,但是第二行会丢失很多内容,具体的原因暂时没去了解。
第四步,将所有添加好标题栏的文件进行转化,生成目标json格式文件
# csv2json.py
for filename in files:
with open("./folder/" + filename, "r",encoding="utf-8") as f:
reader = csv.reader(f)
for i,line in enumerate(reader):
dict_content= {}
if i == 0:
title = line
else:
dict_id = {'index': {'_index':index_name,'_id':line[3]}}
dict_id_json = json.dumps(dict_id)
for i in range(len(title)):
if not line[5]:
line[5] = None
dic = {title[i]: line[i]}
dict_content.update(dic)
dict_content_json = json.dumps(dict_content, ensure_ascii=False)
with open("./outJson/" + filename + ".json", "a", encoding="utf-8") as fp:
fp.write(dict_id_json+ "\n" +dict_content_json + "\n")
首先,我们需要知道,elasticsearch的批量导入方法如下:
curl -X POST "localhost:9200/_bulk?pretty" -H "Content-Type: application/json" --data-binary @output.json
关于curl(客户端client的 URL 工具)命令行工具的用法就不写了,只要知道可以在--data-binary跟上要上传至ES中的文件就可以了,output.json中的文件格式应该如下所示,当然,些许区别是没有关系的,例如不增加_type,毕竟elasticsearch7.x中_type只有一种,又或者删除_index也是可行的,直接利用localhost:9200/nba/_bulk来进行导入
{"index":{"_index":"nba","_type":"_doc","_id":"1"}}
{"countryEn":"United States","teamName":"老鹰","birthDay":831182400000,"country":"美国","teamCityEn":"Atlanta","code":"jaylen_adams","displayAffiliation":"United States","displayName":"杰伦 亚当斯","schoolType":"College","teamConference":"东部","teamConferenceEn":"Eastern","weight":"86.2 公斤","teamCity":"亚特兰大","playYear":1,"jerseyNo":"10","teamNameEn":"Hawks","draft":2018,"displayNameEn":"Jaylen Adams","heightValue":1.88,"birthDayStr":"1996-05-04","position":"后卫","age":23,"playerId":"1629121"}
上面的程序中,我利用dumps函数将文件每一行都生成了两行json数据,并配置ensure_ascii=False取消输出中文ascii 字符码;并将不存在的日期输出为None,这个None在文件中会显示成null,这个主要是因为ES当中日期格式除非有值,否则只接受null。
第五,写一个linux脚本批量导入数据bulkData2ES.sh。
#!/bin/bash
#20210406
#批量导入json数据至ES
folder=/root/MyScripts/outJson
files=$(ls $folder)
cd /root/MyScripts/outJson
for file in $files
do
echo $file
curl -X POST "localhost:9200/_bulk?pretty" -H "Content-Type: application/json" --data-binary @$file
done
echo "that's ok!"
数据都导入进去后,如何在django中应用呢?我现在应用的方法比较粗糙,应该还有更好的方法,后续再来完善。
第一步,在django的虚拟环境中安装相关库以操作ES:
pip install elasticsearch
pip install elasticsearch_dsl
第二步,建立一个名为ES的python包,新建es.py文件,开始进行模糊双条件查询,让其返回20条结果,代码如下:
#es.py
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search,Q
client = Elasticsearch()
my_client = Search(using=client, index="nba")
def SearchOnES(params):
q = Q("multi_match", query=params, fields=['name', 'chinese'])
my_search= my_client[0:20].query(q)
# resp = my_search.execute(ignore_cache=True)
return my_search
第三步,在需要查询的app下面的views中导入相关函数
#views.py
from ES import es
# 我用的是listview,将默认的get_queryset函数可以精简下
def get_queryset(self):
return super().get_queryset().filter(pk__lte=1)
#在get_context_data函数中调用es查询,不走数据库
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
res = self.getPaginationInfo(context['paginator'], context['page_obj'])
context.update(res)
if self.request.GET:
context['nba'] = es.SearchOnES(self.request.GET.get('q'))
return context
第四步,假如需要更换索引下面的某个字段分词器,或者修改字段类型怎么办呢?ES不支持直接修改,只能更换索引(利用别名)!步骤如下:
先建立原索引的别名,将上面第二步中(django)的index更换成索引别名
#建立别名
POST _aliases
{
"actions": [
{
"add": {
"index": "nba",
"alias": "nbaLatest"
}
}
]
}
再次新建一个索引,可以使用_reindex将旧索引数据复制到新索引
#我是采取数据重新导入的方法
nba20210407
#可以通过如下代码查看文档数量
curl -X GET "http://localhost:9200/_cat/count/nba20210407?v&pretty"
利用ES的两个索引可以使用一个别名的特性来无缝衔接索引的切换:
POST /_aliases
{
"actions": [
{
"add": {
"index": "nba20210407",
"alias": "nbaLatest"
}
},
{
"remove": {
"index": "nba",
"alias": "nbaLatest"
}
}
]
}
删除旧索引
DELETE /nba
第五步,如何更新数据?一成不变的数据是没有任何意义的,那么如何更新文档的部分内容呢?
#通过update方法,注意键值为doc!
POST nba/_update/737
{
"doc": {
"name":"hello world"
}
}
最好不要通过put方法去更新,以下的方法会将该条文档所有字段都抹除,只剩下name和xy两个字段内容,所以如果用以下方法来更新的话,切记必须全量更新文档字段
PUT nba/_doc/737
{
"name":"hello 1111",
"xy":"test"
}
参考:#elasticsearch-dsl
https://elasticsearch-dsl.readthedocs.io/en/latest/