flask快速开发框架(手把手教你使用Flask搭建ES搜索引擎预备篇)
flask快速开发框架
手把手教你使用Flask搭建ES搜索引擎预备篇1 前言
Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上。
那么如何实现 Elasticsearch和 Python 的对接成为我们所关心的问题了 (怎么什么都要和 Python 关联啊)。
2 Python 交互所以,Python 也就提供了可以对接 Elasticsearch的依赖库。
- pip install elasticsearch
初始化连接一个 Elasticsearch 操作对象。
- def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):
- # self.es = Elasticsearch([ip], http_auth=('username', 'password'), port=9200)
- self.es = Elasticsearch("localhost:9200")
- self.index_type = index_type
- self.index_name = index_name
默认端口 9200,初始化前请确保本地已搭建好 Elasticsearch的所属环境。
根据 ID 获取文档数据- def get_doc(self, uid):
- return self.es.get(index=self.index_name, id=uid)
- def insert_one(self, doc: dict):
- self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
- def insert_array(self, docs: list):
- for doc in docs:
- self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
- def search(self, query, count: int = 30):
- dsl = {
- "query": {
- "multi_match": {
- "query": query,
- "fields": ["title", "content", "link"]
- }
- },
- "highlight": {
- "fields": {
- "title": {}
- }
- }
- }
- match_data = self.es.search(index=self.index_name, body=dsl, size=count)
- return match_data
- def __search(self, query: dict, count: int = 20): # count: 返回的数据大小
- results = []
- params = {
- 'size': count
- }
- match_data = self.es.search(index=self.index_name, body=query, params=params)
- for hit in match_data['hits']['hits']:
- results.append(hit['_source'])
- return results
- def delete_index(self):
- try:
- self.es.indices.delete(index=self.index_name)
- except:
- pass
好啊,封装 search 类也是为了方便调用,整体贴一下。
- from elasticsearch import Elasticsearch
- class elasticSearch():
- def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):
- # self.es = Elasticsearch([ip], http_auth=('elastic', 'password'), port=9200)
- self.es = Elasticsearch("localhost:9200")
- self.index_type = index_type
- self.index_name = index_name
- def create_index(self):
- if self.es.indices.exists(index=self.index_name) is True:
- self.es.indices.delete(index=self.index_name)
- self.es.indices.create(index=self.index_name, ignore=400)
- def delete_index(self):
- try:
- self.es.indices.delete(index=self.index_name)
- except:
- pass
- def get_doc(self, uid):
- return self.es.get(index=self.index_name, id=uid)
- def insert_one(self, doc: dict):
- self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
- def insert_array(self, docs: list):
- for doc in docs:
- self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
- def search(self, query, count: int = 30):
- dsl = {
- "query": {
- "multi_match": {
- "query": query,
- "fields": ["title", "content", "link"]
- }
- },
- "highlight": {
- "fields": {
- "title": {}
- }
- }
- }
- match_data = self.es.search(index=self.index_name, body=dsl, size=count)
- return match_data
尝试一下把 Mongodb 中的数据插入到 ES 中。
- import json
- from datetime import datetime
- import pymongo
- from app.elasticsearchClass import elasticSearch
- client = pymongo.MongoClient('127.0.0.1', 27017)
- db = client['spider']
- sheet = db.get_collection('Spider').find({}, {'_id': 0, })
- es = elasticSearch(index_type="spider_data",index_name="spider")
- es.create_index()
- for i in sheet:
- data = {
- 'title': i["title"],
- 'content':i["data"],
- 'link': i["link"],
- 'create_time':datetime.now()
- }
- es.insert_one(doc=data)
到 ES 中查看一下,启动 elasticsearch-head 插件。
如果是 npm 安装的那么 cd 到根目录之后直接 npm run start 就跑起来了。
本地访问 http://localhost:9100/
发现新加的 spider 数据文档确实已经进去了。
3 爬虫入库要想实现 ES 搜索,首先要有数据支持,而海量的数据往往来自爬虫。
为了节省时间,编写一个最简单的爬虫,抓取 百度百科。
简单粗暴一点,先 递归获取 很多很多的 url 链接
- import requests
- import re
- import time
- exist_urls = []
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36',
- }
- def get_link(url):
- try:
- response = requests.get(url=url, headers=headers)
- response.encoding = 'UTF-8'
- html = response.text
- link_lists = re.findall('.*?<a target=_blank href="/item/([^:#=<>]*?)".*?</a>', html)
- return link_lists
- except Exception as e:
- pass
- finally:
- exist_urls.append(url)
- # 当爬取深度小于10层时,递归调用主函数,继续爬取第二层的所有链接
- def main(start_url, depth=1):
- link_lists = get_link(start_url)
- if link_lists:
- unique_lists = list(set(link_lists) - set(exist_urls))
- for unique_url in unique_lists:
- unique_url = 'https://baike.baidu.com/item/' + unique_url
- with open('url.txt', 'a+') as f:
- f.write(unique_url + '\n')
- f.close()
- if depth < 10:
- main(unique_url, depth + 1)
- if __name__ == '__main__':
- start_url = 'https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
- main(start_url)
把全部 url 存到 url.txt 文件中之后,然后启动任务。
- # parse.py
- from celery import Celery
- import requests
- from lxml import etree
- import pymongo
- app = Celery('tasks', broker='redis://localhost:6379/2')
- client = pymongo.MongoClient('localhost',27017)
- db = client['baike']
- @app.task
- def get_url(link):
- item = {}
- headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
- res = requests.get(link,headers=headers)
- res.encoding = 'UTF-8'
- doc = etree.HTML(res.text)
- content = doc.xpath("//li[@class='lemma-summary']/li[@class='para']//text()")
- print(res.status_code)
- print(link,'\t','++++++++++++++++++++')
- item['link'] = link
- data = ''.join(content).replace(' ', '').replace('\t', '').replace('\n', '').replace('\r', '')
- item['data'] = data
- if db['Baike'].insert(dict(item)):
- print("is OK ...")
- else:
- print('Fail')
- from parse import get_url
- def main(url):
- result = get_url.delay(url)
- return result
- def run():
- with open('./url.txt', 'r') as f:
- for url in f.readlines():
- main(url.strip('\n'))
- if __name__ == '__main__':
- run()
- celery -A parse worker -l info -P gevent -c 10
哦豁 !! 你居然使用了 Celery 任务队列,gevent 模式,-c 就是10个线程刷刷刷就干起来了,速度杠杠的 !!
啥?分布式? 那就加多几台机器啦,直接把代码拷贝到目标服务器,通过 redis 共享队列协同多机抓取。
这里是先将数据存储到了 MongoDB 上(个人习惯),你也可以直接存到 ES 中,但是单条单条的插入速度堪忧(接下来会讲到优化,哈哈)。
使用前面的例子将 Mongo 中的数据批量导入到 ES 中,OK !!!
到这一个简单的数据抓取就已经完毕了。
好啦,现在 ES 中已经有了数据啦,接下来就应该是 Flask web 的操作啦,当然,Django,FastAPI 也很优秀。嘿嘿,你喜欢 !!
原文链接:https://mp.weixin.qq.com/s/kZBcS-9esICzfS7C_tmovA
- python 取出时间段日志(python 实现提取某个索引中某个时间段的数据方法)
- python爬虫并保存excel实例(Python实现爬取亚马逊数据并打印出Excel文件操作示例)
- Python实现模拟点击(用python实现刷点击率的示例代码)
- python人脸识别库(20行python代码实现人脸识别)
- python统计图参数(Python使用统计函数绘制简单图形实例代码)
- python矩阵怎么生成(python实现矩阵打印)
- python编程ai人工智能(AI领域都在用Python即将被淘汰?网友预测未来的编程语言不会是TA)
- python mongodb 基本操作(Python使用pymongo库操作MongoDB数据库的方法实例)
- python改变字体颜色指令(使用Python自动化破解自定义字体混淆信息的方法实例)
- python验证码处理教程(python简单验证码识别的实现方法)
- python取当前日期(Python实现根据日期获取当天凌晨时间戳的方法示例)
- pythonsocket详细用法(Python中的Socket 与 ScoketServer 通信及遇到问题解决方法)
- python的4种数字变量(Python将字符串常量转化为变量方法总结)
- python做了一个自动翻译的小工具(Python 20行简单实现有道在线翻译的详解)
- python 的常用工具(Python静态类型检查新工具之pyright 使用指南)
- 2021-10-07 00:38:09
- 门外之见 海蛎子味 的表演,能走多远(门外之见海蛎子味)
- 三部冷门谍战剧,第一部2014年拍摄,至今还未播出(三部冷门谍战剧)
- 《金陵秘事》的剧情跌宕起伏 给观众带来的怎样的感官体验(金陵秘事的剧情跌宕起伏)
- 少儿口才表达影响未来一生,50首经典绕口令和孩子玩出聪明大脑(少儿口才表达影响未来一生)
- 玩网游居然让人更友善 很难以让人置信(玩网游居然让人更友善)
- 学好汉语拼音,从娃娃绕口令抓起,平时还是要多练 收藏好(从娃娃绕口令抓起)
热门推荐
- elasticsearch 索引创建过程(使用elasticsearch定时删除索引数据)
- python将一个字符串逆序输出(Python字符串逆序的实现方法一题多解)
- python类定义(浅谈python新式类和旧式类区别)
- python关闭程序强制退出线程(python多线程调用exit无法退出的解决方法)
- mysql带log的版本(聊聊MYSQL中Redo Log是什么?)
- 公有云私有云混合云对比(公有云和私有云的区别 如何搭建云存储)
- pythonai识别算法(Python3调用百度AI识别图片中的文字功能示例测试可用)
- 改变Visual Studio的主题
- laravel事务状态(laravel dingo API返回自定义错误信息的实例)
- css选择器一般写多少(深入理解CSS选择器优先级)
排行榜
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9