目录
request的基本使用
urllib使用
图片爬取
获取动态数据
session和cokkie的处理
使用xpath解析
使用正则解析
BeautifulSoup使用
seleium自动化爬虫
其他自动化操作
实现无界面
自动化处理iframe标签
基于selenium的12306用户登录
代理的使用
验证码解析
协程的使用
同步爬虫
多线程异步爬虫的使用
线程池
异步协程
aiohttp实现任务异步协程
分布式爬虫
简单练手项目
肯德基破解
爬取简历模板
百度AI实现爬虫
好久之前做的python非框架爬虫全集笔记一直没整理,今天有空整理了一番,方便以后查看。
request的基本使用
案例一
# -*- coding: utf-8 -*- import requests
if __name__ == "__main__": # step 1:指定url url = 'https://www.sogou.com/' # step 2:发起请求 response = requests.get(url=url) # step 3:获取响应数据.text返回的是字符串形式的响应数据 page_text = response.text print(page_text) # step_4:持久化存储 with open('./sogou.html', 'w', encoding='utf-8') as fp: fp.write(page_text) print('爬取数据结束!!!')
案例二:
# -*- coding: utf-8 -*- import requests import json
if __name__ == "__main__": url = 'https://movie.douban.com/j/search_subjects' param = { 'type': 'movie', 'tag': "喜剧", 'sort': 'recommend', 'page_limit': 20, # 一次取出的个数 'page_start': 20, # 从库中的第几部电影去取 } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.75' } response = requests.get(url=url, params=param, headers=headers)
list_data = response.json()
fp = open('./douban.json', 'w', encoding='utf-8') json.dump(list_data, fp=fp, ensure_ascii=False)
print('over!!!')
案例三
# -*- coding: utf-8 -*- import requests import json
if __name__ == "__main__": # 1.指定url post_url = 'https://fanyi.baidu.com/sug' # 2.进行UA伪装 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.75' } # 3.请求参数处理(同get请求一致) word = input('enter a word:') data = { 'kw': word } # 4.请求发送 response = requests.post(url=post_url, data=data, headers=headers) # 5.获取响应数据:json()方法返回的是obj_(如果确认响应数据是json类型的,才可以使用json()) dic_obj = response.json()
# 持久化存储 fileName = word + '.json' fp = open(fileName, 'w', encoding='utf-8') json.dump(dic_obj, fp=fp, ensure_ascii=False)
print('over!!!')
案例四
# -*- coding: utf-8 -*- # 每次爬取需要进行UA伪装,伪装成某款浏览器 # User-Agent(请求载体生份标识) import requests
if __name__ == "__main__": # UA伪装:将对应的User-Agent封装到一个字典中 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.75' } url = 'https://www.sogou.com/web' # 处理url携带的参数:封装到字典中 kw = input('enter a word:') param = { 'query': kw } # 对指定的url发起的请求对应的url是携带参数的,并且请求过程中处理了参数 response = requests.get(url=url, params=param, headers=headers)
page_text = response.text fileName = kw + '.html' with open(fileName, 'w', encoding='utf-8') as fp: fp.write(page_text) print(fileName, '保存成功!!!')
urllib使用
import requests import re import os import urllib
dirName = "imgLab" if not os.path.exists(dirName): os.mkdir(dirName) url = "https://www.baidu.com/s?wd=%E7%8B%97&tn=98012088_5_dg&ch=11" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.37', } response = requests.get(url=url, headers=headers) page_text = response.text ex = '<div class="op-img-address-divide-high">.*?<img src="(.*?)" class=.*?</div>' img_src_list = re.findall(ex, page_text, re.S) for src in img_src_list: imgPath = dirName + "/" + src.split('/')[-1] src = src + '&fm=26' urllib.request.urlretrieve(src, imgPath) print(imgPath, '下载成功!')
图片爬取
案例一
from lxml import etree import requests import os import urllib
fileName = "图片" if not os.path.exists(fileName): os.mkdir(fileName)
url = "https://pic.netbian.com/" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36' } response = requests.get(url=url, headers=headers) page_text = response.text tree = etree.HTML(page_text) li_list = tree.xpath('//*[@id="main"]/div[3]/ul/li') arr = [] for li in li_list: href = 'https://pic.netbian.com' + li.xpath(' ./a/span/img/@src')[0] arr.append(href)
for ar in arr: filePath = fileName + '/' + ar.split('/')[-1] urllib.request.urlretrieve(ar, filePath)
print("爬取结束!!!")
案例二
# -*- coding: utf-8 -*- import requests
if __name__ == "__main__": # 如何爬取图片数据 url = 'https://th.bing.com/th/id/R6706ad2e7a68edabddbc1b5644707c4f?rik=u8uR%2bWe5bxIosA&riu=http%3a%2f%2fpic.lvmama.com%2fuploads%2fpc%2fplace2%2f2016-09-14%2f9aab9bb7-2593-4ca6-8c5a-31355443aebc.jpg&ehk=HpOwqU6w6%2fssF4CJQMbTOshMh4lIXJONXU%2btYNsAKSI%3d&risl=1&pid=ImgRaw' # content返回的是二进制形式的图片数据 # text(字符串) content(二进制) json() (对象) img_data = requests.get(url=url).content
with open('./qiutu.jpg', 'wb') as fp: fp.write(img_data)
获取动态数据
# -*- coding: utf-8 -*- import requests import json
if __name__ == "__main__": headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.75' } # 批量获取不同企业的id值 url = '' # 参数的封装 id_list = [] # 存储企业的id all_data_list = [] # 存储所有的企业详情数据 for page in range(1, 6): page = str(page) data = { } json_ids = requests.post(url=url, headers=headers, data=data).json() for dic in json_ids['list']: id_list.append(dic['ID'])
# 获取所有的企业详情数据 post_url = '' for id in id_list: data = { 'id': id } detail_json = requests.post(url=url, headers=headers, data=data).json() all_data_list.append(detail_json) # 持久化存储all_data_list fp = open('./allData.json', 'w', encoding='utf-8') json.dump(all_data_list, fp=fp, ensure_ascii=False) print('over!!!')
session和cokkie的处理
使用xpath解析
案例一
# -*- coding: utf-8 -*- from lxml import etree
if __name__ == '__main__': # 实例化好了一个etree对象,且将被解析的源码加载到了该对象中 tree = etree.parse('r.html') # r=tree.xpath('/html/body/div') # r=tree.xpath('/html//div') # r=tree.xpath('//div') # r=tree.xpath('//div[@class="song"]') # r=tree.xpath('//div[@class="tang"]//li[5]/a/text()')[0] # r=tree.xpath('//li[7]//text()') # r=tree.xpath('//div[@class="tang"]//text()') r = tree.xpath('//div[@class="song"]/img/@src')
print(r)
案例二
# -*- coding: utf-8 -*- # 需求: 爬取58二手房的房源信息 import requests from lxml import etree
if __name__ == "__main__": headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/53' } # 爬取到页面源码数据 url = 'https://www.58.com/ershoufang/' page_text = requests.get(url=url, headers=headers).text
# 数据解析 tree = etree.HTML(page_text) # 存储的就是标签对象 td_list = tree.xpath('//td[@class="t"]') fp = open('58.txt', 'w', encoding='utf-8') for td in td_list: title = td.xpath('./a/text()')[0] print(title) fp.write(title + '\n') fp.close()
案例三
# -*- coding: utf-8 -*- # 需求: 解析下载图片数据 import requests from lxml import etree import os
if __name__ == "__main__": headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/53' } # 爬取到页面源码数据 url = 'https://pic.netbian.com/4kyouxi/' response = requests.get(url=url, headers=headers) # 手动设定响应数据编码格式 # response.encoding='utf-8' page_text = response.text
# 数据解析 tree = etree.HTML(page_text) li_list = tree.xpath('//div[@class="slist"]/ul/li')
# 创建一个文件夹 if not os.path.exists('./picLibs'): os.mkdir('./picLibs') for li in li_list: img_src = 'http://pic.netbian.com' + li.xpath('./a/img/@src')[0] img_name = li.xpath('./a/img/@alt')[0] + '.jpg' # 通用解决中文乱码方案 img_name = img_name.encode('iso-8859-1').decode('gbk')
# print(img_name,img_src) # 请求图片进行持久化存储 img_data = requests.get(url=img_src, headers=headers).content img_path = 'picLibs/' + img_name with open(img_path, 'wb') as fp: fp.write(img_data) print(img_name, '下载成功!!!')
案例四
# -*- coding: utf-8 -*- # 需求: 解析出所有城市名称 import requests from lxml import etree
if __name__ == "__main__": '''headers={ 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/53' } #爬取到页面源码数据 url='https://www.aqistudy.cn/historydata/' response=requests.get(url=url,headers=headers) #手动设定响应数据编码格式 #response.encoding='utf-8' page_text=response.text
#数据解析 tree=etree.HTML(page_text) host_li_list=tree.xpath('//div[@class="bottom"]/ul/li') all_city_names=[] #解析到热门城市的城市名称 for li in host_li_list: hot_city_name=li.xpath('./a/text()')[0] all_city_names.append(hot_city_name)
#解析全部城市的名称 city_names_list=tree.xpath('div[@class="bottom"]/ul/div[2]/li') for li in city_names_list: city_name=li.xpath('./a/text()')[0] all_city_names.append(city_name)
print(all_city_names,len(all_city_names))''' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/53' } # 爬取到页面源码数据 url = 'https://www.aqistudy.cn/historydata/' response = requests.get(url=url, headers=headers) # 手动设定响应数据编码格式 # response.encoding='utf-8' page_text = response.text
# 数据解析 tree = etree.HTML(page_text) a_list = tree.xpath('//div[@class="bottom"]/ul/li/a | //div[@class="bottom"]/div[2]/li/a') all_city_names = [] for a in a_list: city_name = a.xpath('./text()')[0] all_city_names.append(city_name) print(all_city_names, len(all_city_names))
使用正则解析
案例一
# -*- coding: utf-8 -*- import requests import re import os
# 需求: 爬取糗事百科中糗图板块下所有的糗图图片 if __name__ == '__main__': # 创建一个文件夹,保存所有的图片 if not os.path.exists('./qiutuLibs'): os.mkdir('./qiutuLibs')
url = 'https://www.qiushibaike.com/imgrank/' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.75' } # 使用通用爬虫对url对应的一整张页面进行爬取 page_text = requests.get(url=url, headers=headers).text
# 使用聚焦爬虫将页面中所有的糗图进行爬取 ex = '<div class="thumb">.*?<img src="(.*?)" alt.*?</div>' img_src_list = re.findall(ex, page_text, re.S) # print(img_src_list) for src in img_src_list: # 拼接一个完整的图片url src = 'https:' + src # 请求到了图片的二进制数据 img_data = requests.get(url=src, headers=headers).content # 生成图片名称 img_name = src.split('/')[-1] # 图片最终存储的路径 imgPath = './qiutuLibs/' + img_name with open(imgPath, 'wb') as fp: fp.write(img_data) print(img_name, '下载成功!!!')
案例二
# -*- coding: utf-8 -*- import requests import re import os
# 需求: 爬取糗事百科中糗图板块下所有的糗图图片 if __name__ == '__main__': headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.75' } # 创建一个文件夹,保存所有的图片 if not os.path.exists('./qiutuLibs'): os.mkdir('./qiutuLibs') # 设置一个通用的url模板 url = 'https://www.qiushibaike.com/imgrank/page/%d/'
for pageNum in range(1, 3): # 对应页面的url new_url = format(url % pageNum)
page_text = requests.get(url=new_url, headers=headers).text
# 使用聚焦爬虫将页面中所有的糗图进行爬取 ex = '<div class="thumb">.*?<img src="(.*?)" alt.*?</div>' img_src_list = re.findall(ex, page_text, re.S) # print(img_src_list) for src in img_src_list: # 拼接一个完整的图片url src = 'https:' + src # 请求到了图片的二进制数据 img_data = requests.get(url=src, headers=headers).content # 生成图片名称 img_name = src.split('/')[-1] # 图片最终存储的路径 imgPath = './qiutuLibs/' + img_name with open(imgPath, 'wb') as fp: fp.write(img_data) print(img_name, '下载成功!!!')
BeautifulSoup使用
案例一
# -*- coding: utf-8 -*- import requests from bs4 import BeautifulSoup if __name__=='__main__': #对首页的页面数据进行爬取 headers={ 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.76' } url='https://www.shicimingju.com/book/sanguoyanyi.html' page_text=requests.get(url=url,headers=headers) page_text.encoding='utf-8' page_text=page_text.text
#在首页中解析出章节的标题和详情页的url #1.实例化BeautifulSoup对象,需要将页面源码数据加载到该对象中 soup=BeautifulSoup(page_text,'lxml') #解析章节标题和详情的url li_list=soup.select('.book-mulu>ul>li') fp=open('./sanguo.txt','w',encoding='utf-8') for li in li_list: title=li.a.string detail_url='https://www.shicimingju.com'+li.a['href'] #对详情页发起请求,解析出章节内容 detail_page_text=requests.get(url=detail_url,headers=headers) detail_page_text.encoding='utf-8' detail_page_text=detail_page_text.text #解析出详情页中的相关的章节内容 detail_soup=BeautifulSoup(detail_page_text,'lxml') div_tag=detail_soup.find('div',class_='chapter_content') #解析到了章节的内容 content=div_tag.text fp.write(title+':'+content+'\n') print(title,'爬取成功!!!')
案例二
from bs4 import BeautifulSoup import requests import os
fileName = 'novel' if not os.path.exists(fileName): os.mkdir(fileName) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.37', 'Connection': 'close' } url = "https://www.shicimingju.com/book/sanguoyanyi.html" response = requests.get(url=url, headers=headers) response.encoding = 'utf-8' soup = BeautifulSoup(response.text, 'lxml') title = soup.select('.book-mulu > ul > li > a') cnt = 0 for t in title: href = 'https://www.shicimingju.com' + t['href'] response = requests.get(url=href, headers=headers) response.encoding = 'utf-8' page_text = response.text soup = BeautifulSoup(page_text, 'lxml') div = soup.find('div', class_='card bookmark-list')
filePath = fileName + '/' + t.string + '.txt' pageTxt = div.text
with open(filePath, 'w', encoding='utf-8') as fp: fp.write(pageTxt)
print('爬取成功!!!') cnt += 1
if cnt == 10: break
seleium自动化爬虫
解决iframe问题
案例一
# -*- coding: utf-8 -*- # 需求:模拟登录 from selenium import webdriver from time import sleep
bro = webdriver.Chrome(executable_path='./chromedriver')
bro.get('https://qzone.qq.com/')
bro.switch_to.frame('login_frame')
a_tag = bro.find_element_by_id("switcher_plogin") a_tag.click()
userName_tag = bro.find_element_by_id('u') password_tag = bro.find_element_by_id('p') sleep(1) userName_tag.send_keys('1292161328') sleep(1) password_tag.send_keys('1234567890') sleep(1) btn = bro.find_element_by_id('login_button') btn.click()
sleep(3)
bro.quit()
案例二
# -*- coding: utf-8 -*- from selenium import webdriver from lxml import etree from time import sleep
# 实例化一个浏览器对象(传入的驱动程序) bro = webdriver.Chrome(executable_path='./chromedriver') bro.add_argument('-kiosk') # 让浏览器发起一个指定url对应请求 bro.get('http://scxk.nmpa.gov.cn:81/xk/')
# 获取浏览器当前页面的页面源码数据 page_text = bro.page_source
# 解析企业名称 tree = etree.HTML(page_text) li_list = tree.xpath('//ul[@id="gzlist"]/li') for li in li_list: name = li.xpath('./dl/@title')[0] print(name)
sleep(5) bro.quit()
案例三
其他自动化操作
from selenium import webdriver from time import sleep
bro = webdriver.Chrome(executable_path='./chromedriver')
bro.get('http://www.taobao.com/')
# 实现标签定位 search_input = bro.find_element_by_id('q') # 标签交互 search_input.send_keys('Iphone')
# 执行一组js程序 bro.execute_script('window.scrollTo(0,document.body.scrollHeight)') sleep(2)
# 点击搜索按钮 btn = bro.find_element_by_css_selector('.btn-search') btn.click()
bro.get('https://www.baidu.com') sleep(2) # 回退 bro.back() sleep(2) # 前进 bro.forward()
sleep(5) bro.quit()
案例四
实现无界面
# -*- coding: utf-8 -*- from selenium import webdriver from time import sleep # 实现无可视化界面 from selenium.webdriver.chrome.options import Options # 实现规避检测 from selenium.webdriver import ChromeOptions
# 实现无可视化界面的操作 chrome_options = Options() chrome_options.add_argument('–headless') chrome_options.add_argument('–disable-gpu')
# 实现规避检测 option = ChromeOptions() option.add_experimental_option('excludeSwitches', ['enable-automation'])
# 如何实现selenium的规避风险 bro = webdriver.Chrome(executable_path='./chromedriver', chrome_options=chrome_options, options=option)
# 无可视化界面(无头浏览器) phantomJs bro.get('https://www.baidu.com')
print(bro.page_source) sleep(2) bro.quit()
案例五
from selenium import webdriver from time import sleep # 后面是你的浏览器驱动位置,记得前面加r'','r'是防止字符转义的 driver = webdriver.Chrome(r'./chromedriver') # 用get打开百度页面 driver.get("http://www.baidu.com") # 查找页面的“设置”选项,并进行点击 # driver.find_elements_by_link_text('设置')[0].click() # sleep(2) # # # 打开设置后找到“搜索设置”选项,设置为每页显示50条 # driver.find_elements_by_link_text('搜索设置')[0].click() # sleep(2) # #选中每页显示50条 # m = driver.find_element_by_id('nr') # sleep(2) # m.find_element_by_xpath('//*[@id="nr"]/option[3]').click() # m.find_element_by_xpath('.//option[3]').click() # sleep(2) # # 点击保存设置 # driver.find_elements_by_class_name("prefpanelgo")[0].click() # sleep(2) # # 处理弹出的警告页面 确定accept() 和 取消dismiss() # driver.switch_to_alert().accept() # sleep(2) # 找到百度的输入框,并输入 美女 driver.find_element_by_id('kw').send_keys('美女') sleep(2) # 点击搜索按钮 driver.find_element_by_id('su').click() sleep(2) # 在打开的页面中找到“Selenium – 开源中国社区”,并打开这个页面 driver.find_elements_by_link_text('美女_海量精选高清图片_百度图片')[0].click() sleep(3) # 关闭浏览器 driver.quit()
案例六
自动化处理iframe标签
# -*- coding: utf-8 -*- from selenium import webdriver from time import sleep # 导入动作链对应的类 from selenium.webdriver import ActionChains
bro = webdriver.Chrome(executable_path='./chromedriver')
bro.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')
# 如果定位的标签是存在于iframe标签之中则必须通过如下操作在进行标签定位 bro.switch_to.frame('iframeResult') # 切换浏览器标签定位的作用域 div = bro.find_element_by_id('draggable')
# 动作链 action = ActionChains(bro) # 点击长按的标签 action.click_and_hold(div)
for i in range(5): # perform()立即执行动作链操作 # move_by_offset(x,y):x水平方向,y竖直方向 action.move_by_offset(17, 0).perform() sleep(0.3)
# 释放地址链 action.release()
print(div)
案例七
基于selenium的12306用户登录
# -*- coding: utf-8 -*- import requests from hashlib import md5
class Chaojiying_Client(object):
def __init__(self, username, password, soft_id): self.username = username password = password.encode('utf8') self.password = md5(password).hexdigest() self.soft_id = soft_id self.base_params = { 'user': self.username, 'pass2': self.password, 'softid': self.soft_id, } self.headers = { 'Connection': 'Keep-Alive', 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)', }
def PostPic(self, im, codetype): """ im: 图片字节 codetype: 题目类型 参考 http://www.chaojiying.com/price.html """ params = { 'codetype': codetype, } params.update(self.base_params) files = {'userfile': ('ccc.jpg', im)} r = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=params, files=files, headers=self.headers) return r.json()
def ReportError(self, im_id): """ im_id:报错题目的图片ID """ params = { 'id': im_id, } params.update(self.base_params) r = requests.post('http://upload.chaojiying.net/Upload/ReportError.php', data=params, headers=self.headers) return r.json()
# 使用elenium打开登录页面 from selenium import webdriver import time from PIL import Image from selenium.webdriver import ActionChains
bro = webdriver.Chrome(executable_path='./chromedriver') bro.maximize_window() # 全屏 bro.get('https://kyfw.12306.cn/otn/resources/login.html') time.sleep(1)
# 点击账号登录 bro.find_elements_by_link_text('账号登录')[0].click() time.sleep(1)
# save_screenshot将当前页面进行截图并保存 bro.save_screenshot('aa.png')
# 确定验证码图片对应的左上角和右下角的坐标(裁剪区域确定) code_img_ele = bro.find_element_by_css_selector('#J-loginImg') location = code_img_ele.location # 验证码图片左上角的坐标 print('location:', location) size = code_img_ele.size # 验证码标签对应的长和宽 print('size:', size) # 左上角和右下角坐标 rangle = ( int(location['x']), int(location['y']), int(location['x'] + size['width']), int(location['y'] + size['height'])) # 至此验证码图片区域就确定下来了
i = Image.open('./aa.png') code_img_name = './code.png' # crop根据指定区域进行图片裁剪 frame = i.crop(rangle) frame.save(code_img_name)
# 将验证码图片交给超级鹰进行识别 chaojiying = Chaojiying_Client('1292161328', 'wuxiangnong', '915445') # 用户中心>>软件ID 生成一个替换 96001 im = open('code.png', 'rb').read() # 本地图片文件路径 来替换 a.jpg 有时WIN系统须要// print(chaojiying.PostPic(im, 9004)['pic_str']) result = chaojiying.PostPic(im, 9004)['pic_str'] all_list = [] # 要存储即将被点击的点的坐标[x1,y1][x2,y2] if '|' in result: list_1 = result.split('|') count_1 = len(list_1) for i in range(count_1): xy_list = [] x = int(list_1[i].split(',')[0]) y = int(list_1[i].split(',')[1]) xy_list.append(x) xy_list.append(y) all_list.append(xy_list)
else: x = int(result.split(',')[0]) y = int(result.split(',')[1]) xy_list = [] xy_list.append(x) xy_list.append(y) all_list.append(xy_list) print(all_list) # 遍历列表,使用动作链对每一个列表元素对应的x,y指定的位置进行点击操作 for l in all_list: x = l[0] y = l[1] ActionChains(bro).move_to_element_with_offset(code_img_ele, x, y).click().perform() time.sleep(0.5)
bro.find_element_by_id('J-userName').send_keys('19828430139') time.sleep(2) bro.find_element_by_id('J-password').send_keys('wuxiangnong9595') time.sleep(2) bro.find_element_by_id('J-login').click() time.sleep(3) bro.quit()
代理的使用
案例一
# -*- coding: utf-8 -*- import requests
url = 'https://www.baidu.com/s?wd=ip' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.76' }
page_text = requests.get(url=url, headers=headers, proxies={"https": "222.110.147.50:3128"})
with open('ip.html', 'w', encoding='utf-8') as fp: fp.write(page_text)
验证码解析
# -*- coding: utf-8 -*-
import requests from lxml import etree
'''导入一个打码类'''
# 封装识别验证码图片下载到本地的函数
def getCodeText(imgPath, codeType): # 普通用户用户名 username = 'bobo328410948'
# 普通用户密码 password = 'bobo328410948'
# 软件ID,开发者分成必要参数。登录开发者后台【我的软件】获得! appid = 6003
# 软件密钥,开发者分成必要参数。登录开发者后台【我的软件】获得! appkey = '1f4b564483ae5c907a1d34f8e2f2776c'
# 图片文件:即将被识别的验证码图片的路径 filename = imgPath
# 验证码类型,# 例:1004表示4位字母数字,不同类型收费不同。请准确填写,否则影响识别率。在此查询所有类型 http://www.yundama.com/price.html codetype = codeType
# 超时时间,秒 timeout = 20 result = None # 检查 if (username == 'username'): print('请设置好相关参数再测试') else: # 初始化 yundama = YDMHttp(username, password, appid, appkey)
# 登陆云打码 uid = yundama.login(); print('uid: %s' % uid)
# 查询余额 balance = yundama.balance(); print('balance: %s' % balance)
# 开始识别,图片路径,验证码类型ID,超时时间(秒),识别结果 cid, result = yundama.decode(filename, codetype, timeout); print('cid: %s, result: %s' % (cid, result)) return result
if __name__ == "__main__": headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.76' } url = 'https://so.gushiwen.org/user/login.aspx' page_text = requests.get(url=url, headers=headers).text # 解析验证码图片img中的属性值 tree = etree.HTML(page_text) code_img_src = 'https://so.gushiwen.org' + tree.xpath('//*[@id="imgCode"]/@src') img_data = requests.get(url=code_img_src, headers=headers).content # 验证码图片保存到了本地 with open('./code.jpg', 'wb') as fp: fp.write(img_data)
# 打码咯
协程的使用
案例一
# -*- coding: utf-8 -*- import asyncio
async def request(url): print('正在请求的url是', url) print('请求成功', url) return url
# async修饰的函数,调用之后返回的一个协程对象 c = request('www.baidu.com')
# 创建一个事件循环对象 # loop=asyncio.get_event_loop()
# #将协程对象注册到loop中,然后启动loop # loop.run_until_complete(c)
# task的使用 loop = asyncio.get_event_loop() # 基于loop创建一个task对象 task = loop.create_task(c) print(task)
# future的使用 # loop=asyncio.get_event_loop() # task=asyncio.ensure_future(c) # print(task) # loop.run_until_complete(task) # print(task)
def callback_func(task): # result返回的就是任务对象中封装对象对应函数的返回值 print(task.result)
# 绑定回调 loop = asyncio.get_event_loop() task = asyncio.ensure_future(c) # 将回调函数绑定到任务对象中 task.add_done_callback(callback_func) loop.run_until_complete(task)
同步爬虫
import requests
headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.76' } urls = { 'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar', 'http://zjlt.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10229.rar', 'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar' }
def get_content(url): print('正在爬取:', url) response = requests.get(url=url, headers=headers) if response.status_code == 200: return response.content
def parse_content(content): print('响应数据的长度为:', len(content))
for url in urls: content = get_content(url) parse_content(content)
多线程异步爬虫的使用
案例一
# -*- coding: utf-8 -*- import asyncio import time
async def request(url): print('正在下载', url) # 在异步协程中如果出现了同步模块相关代码,那么就无法实现异步 # time.sleep(2) # 当在asyncio中遇到阻塞操作必须进行手动挂起 await asyncio.sleep(2) print('下载完毕', url)
start = time.time() urls = { 'www.baidu.com', 'www.sogou.com', 'www.goubanjia.com' }
# 任务对象:存放多个任务对象 stasks = [] for url in urls: c = request(url) task = asyncio.ensure_future(c) stasks.append(task)
loop = asyncio.get_event_loop() # 需要将任务列表封装到wait中 loop.run_until_complete(asyncio.wait(stasks))
print(time.time() – start)
案例二
# -*- coding: utf-8 -*- import requests import asyncio import time
start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ]
async def get_page(url): print('正在下载', url) # requests.get发起的请求是基于同步,必须基于异步的网络请求模块进行指定的url # aiohttp:基于异步网络请求的模块 response = requests.get(url=url) print(response.text)
tasks = []
for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task)
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('耗时', end – start)
线程池
案例三
# -*- coding: utf-8 -*- import time
# 使用单线程串行方式的执行
def get_page(str): print("正在下载:", str) time.sleep(2) print('下载成功:', str)
name_list = ['xiaozi', 'aa', 'bb', 'cc']
start_time = time.time()
for i in range(len(name_list)): get_page(name_list[i])
end_time = time.time() print('%d second' % (end_time – start_time))
案例四
# -*- coding: utf-8 -*- import time # 导入线程池对应模块的类 from multiprocessing.dummy import Pool
# 使用线程池方式执行 start_time = time.time()
def get_page(str): print("正在下载:", str) time.sleep(2) print('下载成功:', str)
name_list = ['xiaozi', 'aa', 'bb', 'cc']
# 实例化一个线程池 pool = Pool(4) # 将列表每一个元素传递给get_page进行处理 pool.map(get_page, name_list)
end_time = time.time() print(end_time – start_time)
异步协程
案例五
aiohttp实现任务异步协程
# -*- coding: utf-8 -*- import time import asyncio import aiohttp
start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ]
async def get_page(url): async with aiohttp.ClientSession() as session: # get()、post(): # headers,params/data,proxy='http://ip:port' async with session.get(url) as response: # text()可以返回字符串形式的响应数据 # read()返回二进制形式的响应数据 # json()返回的就是json对象 # 注意:获取响应数据操作之前一定要使用await进行手动挂起 page_text = await response.text() print(page_text)
tasks = []
for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task)
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('耗时', end – start)
分布式爬虫
简单练手项目
肯德基破解
# -*- coding: utf-8 -*- import requests
if __name__ == "__main__": url = 'http://www.kfc.com.cn/kfccda/ashx/GetStoreList.ashx?op=keyword' param = { 'cname': '', 'pid': '', 'keyword': '北京', 'pageIndex': '1', 'pageSize': '10', } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36 Edg/89.0.774.75' } response = requests.get(url=url, params=param, headers=headers)
list_data = response.text
fp = open('./KFC.text', 'w', encoding='utf-8') fp.write(list_data) fp.close()
print('over!!!')
爬取简历模板
import requests import os from lxml import etree
if __name__ == "__main__": headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/53' } url = 'https://sc.chinaz.com/jianli/free.html' page_text = requests.get(url=url, headers=headers).text
# 创建文件夹 if not os.path.exists('./new'): os.mkdir('./new')
# 实例对象 tree = etree.HTML(page_text) a_lists = tree.xpath('//div[@id="container"]/div/a')
for a in a_lists: href = a.xpath('./@href')[0] src = 'https:' + href
page_text_detail = requests.get(url=src, headers=headers).text
treeDetail = etree.HTML(page_text_detail) a_lists_products = treeDetail.xpath('//div[@class="clearfix mt20 downlist"]/ul/li')[0] href2 = a_lists_products.xpath('./a/@href')[0] products_name = href2[-7:]
response = requests.get(url=href2, headers=headers) data_products = response.content data_path = 'new/' + products_name
with open(data_path, 'wb') as fp: fp.write(data_products) fp.close() print(products_name, "下载成功!!!")
百度AI实现爬虫
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试