我们讲过分布式爬虫的模式,是一群爬虫去获取同一个网页的内容。为了避免它们在网页中搬运回相同的数据回来,我们在构建框架的时候,就需要考虑到爬虫数据去重这一方面。根据这项需求,小编认为RabbitMQ非常适合用来应用于python分布式爬虫环境的搭建,具体的搭建方法大家请往下看:
RabbitMQ是比较靠谱的消息中间件,得益于它的确认机制,当一条消息消费后如果设置确定模式,那么确认后才会继续消费,如果不确定认,那么这个任务将分配给其他消费者。
基于这种确认机制,可以在高可靠性和高数据要求情景中,避免数据抓取的遗漏和丢失。
其设计思路应该是基于mq设计两个接口,一个用于URL的存放,一个用户URL的获取,同时基于Redis的URL去重,通过类似scrapy-redis 的调度使爬虫运行。
主程序示例:
import pika class RabbitMQBASE: def __new__(cls, *args, **kw): if not hasattr(cls, '_instance'): org = super(RabbitMQBASE, cls) cls._instance = org.__new__(cls) return cls._instance def __init__(self, use='root', pwd='111'): user_pwd = pika.PlainCredentials(use, pwd) self.s_conn = pika.BlockingConnection( pika.ConnectionParameters(host='1.1.1.1', heartbeat_interval=3600, credentials=user_pwd)) def channel(self): return self.s_conn.channel() def close(self): """关闭连接""" self.s_conn.close() @staticmethod def callback(ch, method, properties, body): print(" [消费者] %r" % body) class RabbitMQ(RabbitMQBASE): """ type_:交换机类型fanout、direct、topic exchange:交换机名字 queue_name:队列名字,为空则随机命名 exclusive:队列是否持久化False持久,True不持久 key_list:消费者的交换机、队列绑定的关键词列表 key:生产者路由的关键词 no_ack:是否确认消息True不确定,False确定 """ def __init__(self, use='root', pwd='Kw7pGR4xDD1CsP*U', type_='direct', exchange='test', queue_name=None, exclusive=True, key_list=['test'], key='test', no_ack=True): RabbitMQBASE.__init__(self, use=use, pwd=pwd) self.type_ = type_ self.exchange = exchange self.queue_name = queue_name self.exclusive = exclusive self.key = key self.key_list = key_list self.no_ack = no_ack def rabbit_get(self): """消费者""" channel = self.channel() channel.exchange_declare(exchange=self.exchange, exchange_type=self.type_) if self.queue_name == None: result = channel.queue_declare(exclusive=self.exclusive) self.queue_name = result.method.queue if self.type_ != 'fanout': for key in self.key_list: channel.queue_bind(exchange=self.exchange, # 将交换机、队列、关键字绑定 queue=self.queue_name, routing_key=key) channel.basic_consume(RabbitMQBASE.callback, queue=self.queue_name, no_ack=self.no_ack) channel.start_consuming() def rabbit_put(self, message='hello word'): """生产者""" channel = self.channel() channel.exchange_declare(exchange=self.exchange, exchange_type=self.type_) if self.type_ == 'fanout': self.key = "" channel.basic_publish(exchange=self.exchange, routing_key=self.key, body=message) channel.close()
小伙伴们可以尝试着运行上面的代码,虽然我们不一定能全部弄明白其中的原理,但是我们需要进行RabbitMQ基础的搭建操作。
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试