通过extesion实现scrapy-redis定时调度
# -*- coding: utf-8 -*- import time import json import logging from datetime import datetime from scrapy import signals from scrapy.exceptions import NotConfigured from utils.redisdb import redis_cli from config import env logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('Extension') logger.info(f'extensions env: {env}') # 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html class SpiderInsertStartUrlExtension(object): """ Scrapy所有爬虫实现定时调度的扩展 """ def __init__(self, item_count, crawler): """ 初始化操作 :param item_count: 程序空闲的最大次数 :param crawler: 类,用于发送关闭程序信号 """ self.crawler = crawler self.count = 0 # 统计空闲次数 self.conn = redis_cli() @classmethod def from_crawler(cls, crawler): """ 必须方法 """ # 判断是否启用扩展 if not crawler.settings.getbool('MYEXT_ENABLED'): raise NotConfigured # 每隔5个item 输出当前采集的数量 item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5) ext = cls(item_count, crawler) crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle) # 加载空闲信号 return ext def cron_judgement(self,spider): """ 定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量 比如每日0点1分启动:cron_job = "1 0 * * *" :return: True or False True则执行 """ # crontab的语法规则,该语法只约束启动时间,具体下次调度还得结合 schedule_time # 如果需要定时调度,则执行if语句 cron_job = hasattr(spider,'cron_job') if cron_job: cron_job = spider.cron_job.split(' ') minute = cron_job[0] hour = cron_job[1] day = cron_job[2] month = cron_job[3] week = cron_job[4] now_minute = str(datetime.now().minute) now_hour = str(datetime.now().hour) now_day = str(datetime.now().day) now_month = str(datetime.now().month) now_week = str(datetime.now().weekday() + 1) if minute == '*': minute = now_minute if hour == '*': hour = now_hour if day == '*': day = now_day if month == '*': month = now_month if week == '*': week = now_week if (minute==now_minute)and(hour==now_hour)and(day==now_day)and(month==now_month)and(week==now_week): # 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次 # 为避免次情况的发生,通过redis key 在短时间内做去重 if not self.conn.get(f"{spider.name}:spider_opened"): self.conn.setex(f"{spider.name}:spider_opened",5*60,1) return True else: logger.info(f'{cron_job} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理') return False else: logger.info(f'等待开始调度的时间 {cron_job}...') return False else: return False def interval_time(self,spider): """ 根据间隔时间调度 比如每30分钟调度一次: schedule_time = 12 * 30 :return: True or False 真则执行 """ # 存在cron调度则不需要此间隔调度 cron_job = hasattr(spider,'cron_job') schedule_time = hasattr(spider,'schedule_time') if cron_job: return False if not schedule_time: return False # 空闲超过指定时间 if self.count > spider.schedule_time: self.count = 0 return True else: return False def spider_opened(self, spider): """ 必须方法 只执行一次,爬虫启动的时候执行 支持指定时间启动 """ # 爬虫首次判断是否需要定时调度 # 判断是否需要定时调度 # 如果需要定时调度,则执行if语句 cron_job = hasattr(spider,'cron_job') logger.info(f'环境: {env}') logger.info(f'cronjob: {cron_job}') # 线上环境才执行cron coinsadjust if env == 'prod': if cron_job: while True: run = self.cron_judgement(spider) if run: break else: time.sleep(59) self.insert_start_url(spider, 'opened') logger.info("opened spider %s" % spider.name) # 该变量为了让程序idle的时候输出上次调度的时间,以此判断上次调度是否正常 self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") def insert_start_url(self, spider, status): """ 将url插入redis队列 """ # 断点续爬(不做处理,engine会自动调用start_request方法进行处理) if self.conn.llen(spider.name + ":requests"): logger.info('断点续爬,继续未完成的任务队列...') return elif self.conn.llen(spider.redis_key): logger.info('启动任务队列已生成任务,准备开始...') return # 全新启动
# -*- coding: utf-8 -*-
import time
import json
import logging
from datetime import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured
from utils.redisdb import redis_cli
from config import env
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Extension')
logger.info(f'extensions env: {env}')
# 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html
class SpiderInsertStartUrlExtension(object):
"""
Scrapy所有爬虫实现定时调度的扩展
"""
def __init__(self, item_count, crawler):
"""
初始化操作
:param item_count: 程序空闲的最大次数
:param crawler: 类,用于发送关闭程序信号
"""
self.crawler = crawler
self.count = 0 # 统计空闲次数
self.conn = redis_cli()
@classmethod
def from_crawler(cls, crawler):
"""
必须方法
"""
# 判断是否启用扩展
if not crawler.settings.getbool('MYEXT_ENABLED'):
raise NotConfigured
# 每隔5个item 输出当前采集的数量
item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5)
ext = cls(item_count, crawler)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle) # 加载空闲信号
return ext
def cron_judgement(self,spider):
"""
定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量
比如每日0点1分启动:cron_job = "1 0 * * *"
:return: True or False True则执行
"""
# crontab的语法规则,该语法只约束启动时间,具体下次调度还得结合 schedule_time
# 如果需要定时调度,则执行if语句
cron_job = hasattr(spider,'cron_job')
if cron_job:
cron_job = spider.cron_job.split(' ')
minute = cron_job[0]
hour = cron_job[1]
day = cron_job[2]
month = cron_job[3]
week = cron_job[4]
now_minute = str(datetime.now().minute)
now_hour = str(datetime.now().hour)
now_day = str(datetime.now().day)
now_month = str(datetime.now().month)
now_week = str(datetime.now().weekday() + 1)
if minute == '*':
minute = now_minute
if hour == '*':
hour = now_hour
if day == '*':
day = now_day
if month == '*':
month = now_month
if week == '*':
week = now_week
if (minute==now_minute)and(hour==now_hour)and(day==now_day)and(month==now_month)and(week==now_week):
# 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次
# 为避免次情况的发生,通过redis key 在短时间内做去重
if not self.conn.get(f"{spider.name}:spider_opened"):
self.conn.setex(f"{spider.name}:spider_opened",5*60,1)
return True
else:
logger.info(f'{cron_job} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理')
return False
else:
logger.info(f'等待开始调度的时间 {cron_job}...')
return False
else:
return False
def interval_time(self,spider):
"""
根据间隔时间调度 比如每30分钟调度一次: schedule_time = 12 * 30
:return: True or False 真则执行
"""
# 存在cron调度则不需要此间隔调度
cron_job = hasattr(spider,'cron_job')
schedule_time = hasattr(spider,'schedule_time')
if cron_job:
return False
if not schedule_time:
return False
# 空闲超过指定时间
if self.count > spider.schedule_time:
self.count = 0
return True
else:
return False
def spider_opened(self, spider):
"""
必须方法
只执行一次,爬虫启动的时候执行
支持指定时间启动
"""
# 爬虫首次判断是否需要定时调度
# 判断是否需要定时调度
# 如果需要定时调度,则执行if语句
cron_job = hasattr(spider,'cron_job')
logger.info(f'环境: {env}')
logger.info(f'cronjob: {cron_job}')
# 线上环境才执行cron coinsadjust
if env == 'prod':
if cron_job:
while True:
run = self.cron_judgement(spider)
if run:
break
else:
time.sleep(59)
self.insert_start_url(spider, 'opened')
logger.info("opened spider %s" % spider.name)
# 该变量为了让程序idle的时候输出上次调度的时间,以此判断上次调度是否正常
self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def insert_start_url(self, spider, status):
"""
将url插入redis队列
"""
# 断点续爬(不做处理,engine会自动调用start_request方法进行处理)
if self.conn.llen(spider.name + ":requests"):
logger.info('断点续爬,继续未完成的任务队列...')
return
elif self.conn.llen(spider.redis_key):
logger.info('启动任务队列已生成任务,准备开始...')
return
# 全新启动
elif spider.start_urls:
logger.info('任务队列为空,从 spider.start_urls 生成...')
for url in spider.start_urls:
data = json.dumps({"url":url})
self.conn.lpush(spider.redis_key, data)
logger.info(f"---insert start_url complete {url}")
else:
# 爬虫opened 不需要手动调用start_requests
if status != 'opened':
logger.info('从 start_request 类方法 直接生成任务...')
return spider.start_requests()
def spider_closed(self, spider):
"""
必须方法
"""
logger.info("closed spider %s" % spider.name)
self.crawler.engine.close_spider(spider, '视频详细数据已经采集完毕')
def spider_idle(self, spider):
"""
记录信息,作出关闭选择
框架默认5秒执行一次spider_idle
:param spider:
:return:
"""
logger.info(f'{spider.name} Idle 爬虫上次启动时间为 {self.started_time}')
# 判断redis_key中是否为空,如果为空时,则空闲一次,统计 + 1
if not self.conn.llen(spider.name + ":requests") and not self.conn.llen(spider.redis_key):
self.count += 1
time.sleep(1)
else:
# 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
self.count = 0
self.spider_run(spider)
def spider_run(self,spider):
"""
激活调度爬虫(往队列塞链接)
需要根据条件判断是否激活
"""
# 常规来讲,调度方式二选一
cron = self.cron_judgement(spider)
interval = self.interval_time(spider)
if cron or interval:
# 定时调度需要删除上一次请求产生的去重队列
self.conn.delete(f'{spider.name}:dupefilter')
# 删除有时间延迟
time.sleep(3)
logger.info("clear spider requests dupefilter completed ---- %s" % spider.name)
self.insert_start_url(spider, 'idle')
# logger.info(f'key: {spider.name}:starttime')
# logger.info(f'starttime_parse: {parse_time(self.started_time)} self.started_time {self.started_time}')
# 启动时间计入redis 方便分布式协调任务
started_time_stamp = datetime.strptime(self.started_time, "%Y-%m-%d %H:%M:%S").timestamp()
self.conn.set(f'{spider.name}:starttime', started_time_stamp)
self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")