pycharm爬虫教程(一种基于pyppeteer的新浪微博登录方式)

pycharm爬虫教程(一种基于pyppeteer的新浪微博登录方式)(1)

在微博搜索采集时,默认情况下只显示当前页数据。如果搜索的关键词是热词,当前页数据的时间范围可能只有三五分钟。所以,如果要把数据采集全,则必须登录。

在大批量采集时,必须使用账号构建cookie池,并根据cookie有效期实时更新已过期的cookie,下面主要实现基于Pyppeteer的微博登录,供大家参考。

新浪微博登录DEMO主类:

import asyncio, time from com.fy.plugs.browser.pyppeteer.PyppeteerBrowser import PyppeteerBrowser from com.fy.utils.date.DateUtils import Date_Utils class WeiBoLogin: def __init__(self): self.pb = PyppeteerBrowser() self.du = Date_Utils() def login(self): url = "https://passport.weibo.cn/signin/login?entry=mweibo&res=wel&wm=3349&r=https://m.weibo.cn/" userDataDir = "d://pyppeteer" str(self.du.getCurrentTimeStr_Year()) asyncio.get_event_loop() .run_until_complete(self.pb.getbrowser(False, userDataDir)) asyncio.get_event_loop() .run_until_complete(self.pb.open(url, 60)) time.sleep(10) asyncio.get_event_loop() .run_until_complete(self.pb.inputKw(None, "#loginName", "用户名")) time.sleep(1) asyncio.get_event_loop() .run_until_complete(self.pb.inputKw(None, "#loginPassword", "密码")) time.sleep(1) eles = asyncio.get_event_loop() .run_until_complete(self.pb.getElementsByXpaths(None, '//*[@id="loginAction"]')) asyncio.get_event_loop() .run_until_complete(self.pb.clickByEle(eles[0])) time.sleep(100) if __name__ == '__main__': sbl = WeiBoLogin() sbl.login()

Pyppeteer公共类:

import asyncio, tkinter, traceback import time from pyppeteer import launch from com.fy.utils.http.UserAgentUtils import UserAgentUtils from com.fy.utils.hash.HashUtils import Hash_Utils from com.fy.utils.file.FileUtils import File_Utils class PyppeteerBrowser: def __init__(self): self.hash = Hash_Utils() self.url = None self.ua = UserAgentUtils() #"""使用tkinter获取屏幕大小""") def screen_size(self): tk = tkinter.Tk() width = tk.winfo_screenwidth() height = tk.winfo_screenheight() tk.quit() return width, height async def getbrowser(self, headless=False, userDataDir=None): args = [ "--start-maximized", '--no-sandbox', "--disable-infobars" , "--log-level=3"] parameters = {} if userDataDir == None: parameters = {'headless': headless, #是否打开浏览器;False:打开浏览器;True:进程中运行; 'args': args, 'dumpio': True #'dumpio': True:解决chromium浏览器多开页面卡死问题。 } else: parameters = {'headless': headless, #是否打开浏览器;False:打开浏览器;True:进程中运行; 'args': args, "userDataDir": userDataDir, 'dumpio': True #'dumpio': True:解决chromium浏览器多开页面卡死问题。 } #注意:同一个用户目录(userDataDir)不能被两个chrome进程使用,如果你要多开,记得分别指定用户目录。否则会报编码错误。 self.browser = await launch(parameters) self.page = await self.browser.newPage()#在此浏览器上创建新页面并返回其对象。 width, height = self.screen_size() # 设置网页可视区域大小 await self.page.setViewport({ "width": width, "height": height }) # 是否启用JS,enabled设为False,则无渲染效果 await self.page.setJavaScriptEnabled(enabled=True) #设置请求头userAgent await self.page.setUserAgent(self.ua.getheaders()) await self.preventCheckWebdriver(self.page) print("构造浏览器对象完毕....", self.page) #获取当前操作的界面 async def getPage(self): return self.page #获取当前page对象的链接; async def getCurUrl(self, page): if page == None: page = self.page return await page.url #打开一个新的界面;) async def getnewpage(self): return await self.browser.newPage() #获取当前操作的界面重新加载 async def reload(self): await self.page.reload() #当前操作界面返回 async def goBack(self): await self.page.goBack() #获取当前操作的界面的URL async def getPageUrl(self): await self.page.url() #打开连接; async def open(self, url, timeout=60): try: if url == None: print("当前传入的【url】不能为空,参数错误!!") self.url = url print("打开网页:" (url)) self.res = await self.page.goto(url, options={'timeout':int(timeout * 1000)})#打开连接; await asyncio.sleep(1)#强行等待3秒 status = await self.res.status curUrl = await self.page.url await self.preventCheckWebdriver(self.page) return status, curUrl except:return 404, None async def preventCheckWebdriver(self, page): if page == None: page = self.page await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''') # 以下为插入中间js,将淘宝会为了检测浏览器而调用的js修改其结果。 await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''') await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''') await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''') async def closeBrowser(self, browser): if browser == None: browser = self.browser try: await browser.close() except:pass async def closePage(self, page): if page == None: page = self.page await page.close() async def closeNumPage(self, number:"号码从0开始"): pages = await self.browser.pages() await pages[number].close() return True async def retainLastPage(self): pages = await self.browser.pages() num = 0 for page in pages: if num != (len(pages) - 1): await page.close() else: self.page = page num = 1 async def gerReponseStatus(self): try:return await self.res.status # 响应状态 except:return 200 async def screenshot(self, page): hashCode = self.hash.getMd5Hash(self.url) if page == None: page = self.page await page.screenshot({'path': './screenshots/' str(hashCode) '.png'}) async def getHeader(self): return await self.res.headers # 响应头; async def scrollToButtom(self, page): if page == None: page = self.page await page.evaluate('window.scrollBy(0, document.body.scrollHeight)') async def getCookies(self, page): if page == None: page = self.page return await page.cookies() async def getCookieStr(page): if page == None: page = self.page cookies_list = await page.cookies() cookies = '' for cookie in cookies_list: str_cookie = '{0}={1};' str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value')) cookies = str_cookie try:print(cookies) except:pass return cookies async def setCookies(self, page, cookies): if page == None: page = self.page return await page.setCookie(*cookies) async def getHtml(self, page): if page == None: page = self.page return (await page.content()) async def getCurPageTitle(self, page): if page == None: page = self.page return (await page.title()) async def getElementFieldValue(self, page, element, field): if element == None: print("当前传入的【element】不能为空,参数错误!!") return None if field == None: print("当前传入的【field】不能为空,参数错误!!") return None if page == None: page = self.page if str(type(element)) == "<class 'list'>": print("当前传入的【element】不是单个对象,为list集合,参数错误!!") return None fieldValue = (await element.getProperty(field)).jsonValue() return fieldValue async def getPageWidthHight(self, page): if page == None: page = self.page return await page.evaluate('''() => { return { width: document.documentElement.clientWidth, height: document.documentElement.clientHeight, deviceScaleFactor: window.devicePixelRatio, } }''') async def getCurBrowserAllPages(self): return await self.browser.pages() async def getElementsByXpaths(self, page, xpath:'如://div[@class="title-box"]/a'): if xpath == None: print("当前传入的【xpath】不能为空,参数错误!!") return None if page == None: page = self.page try:elemList = await page.xpath(xpath) except: print("获取xpath路径为【" str(xpath) "】的标签对象异常...") return elemList#返回类型为:list集合; async def getPageText(self, page): if page == None: page = self.page '''Pyppeteer的evaluate()方法只使用JavaScript字符串,该字符串可以是函数也可以是表达式, Pyppeteer会进行自动判断。但有时会判断错误,如果字符串被判断成了函数,并且报错, 可以添加选项force_expr=True,强制Pyppeteer作为表达式处理。''' return await page.evaluate('document.body.textContent', force_expr=True) async def getElementText(self, page, element): if element == None: print("当前传入的【element】不能为空,参数错误!!") return None if page == None: page = self.page if str(type(element)) == "<class 'list'>": print("当前传入的【element】不是单个对象,为list集合,参数错误!!") return None return await page.evaluate('(element) => element.textContent', element) async def getElementBySelector(self, page , selector): if selector == None: print("当前传入的【selector】不能为空,参数错误!!") return None if page == None: page = self.page return await page.querySelector(selector) async def inputKw(self, page, selector:"如:'input#kw.s_ipt':获取input标签中id='kw',class='s_ipt'的对象。不可用xpath路径", kw:'待输入的关键词'): if kw == None: return None if selector == None: return None if page == None: page = self.page try:print(selector, kw) except:pass await page.type(selector, kw) return None async def clickElement(self, page, selector:"如:'input#kw.s_ipt':获取input标签中id='kw',class='s_ipt'的对象。。不可用xpath路径"): if selector == None: print("当前传入的【selector】不能为空,参数错误!!") if page == None: page = self.page await page.click(selector)#如果selector获取的对象是list集合,则执行第一个元素的点击; async def removeInputValue(self, page, idValue): if idValue == None: print("当前传入的【idValue】不能为空,参数错误!!") if page == None: page = self.page await page.evaluate("document.querySelector('#" str(idValue) "').value=''") print("清空【" str(idValue) "】的内容") async def clickByEle(self, ele): if ele == None: return return await ele.click() async def getLastPage(self): pages = await self.browser.pages() return pages[-1] async def getPageTotal(self): pages = await self.browser.pages() return len(pages) async def getFirstPage(self): pages = await self.browser.pages() return pages[0] async def getAllFrames(self, page): if page == None: page = self.page return await page.frames async def getScreenshotByEle(self, page, ele, screenshotFilePath:"目前测试只有.png图片可正常生成,jpg异常;"): picture = '' try: fu = File_Utils(None) fu = File_Utils(fu.getParentDir(screenshotFilePath)) if not fu.exists(fu.getParentDir(screenshotFilePath)):fu.makeDirs()#如果图片的保存目录不存在,则创建; time.sleep(3) try: for _ in range(6): clip = await ele.boundingBox() picture = base64.b64encode(await page.screenshot({ 'path': screenshotFilePath, # 图片路径, 不指定就不保存 'clip': clip # 指定图片位置,大小 })) if picture != '': break except Exception as e: print(traceback.print_exc()) except Exception as e: print(traceback.print_exc()) return picture

注意事项:

测试过程中发现,基于PC端的登录界面,在Pyppeteer浏览器中,登录按钮无法使用。但是手机端登录界面可以正常登录

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页