最近一段时间想着做个资源网站,奈何没数据,准备从几个相关的站点上抓一些数据,数据量每个站点都在几千万左右,这里简单总结下最近一段时间在做的测试。
对于大部分内容网站来说,做反爬处理是必不可少的,其中最基本的就是限制ip的访问次数,然后就是做一些登录校验,做一些前端加密,再就是增加一些访问限制等等。
最近在抓的几个网站都存在限制IP访问的限制,再抓取了几百条后就无法继续抓取了,此时就需要代理IP来突破限制,最近碰到的问题是这样的:
站点本身的访问速度并不快,导致我抓一条记录接近5s ,如果按照这样下去的话,我得抓到明年去了,显然是不可行的。但是我还要保证代理IP可用且目标地址不遗漏。(代理IP存在不可用的情况)
所以最近几天一直都在尝试如何来处理这个问题,最开始的思路是:自己创建一个池子,每次池子中放进N个目标地址,然后启动M个程序来访问,成功则消掉,不成功则放回去,重新取。
经过了一天一夜的运行后发现速度还是慢的很,最多就是相对于一条一条的访问快了两三倍,但是对于如此大的数据量来讲,还是要持续很久,由于使用的代理IP是付费的,那么仅仅代理IP费用就入不敷出了。
后来又尝试着单个代理并发访问N个目标地址,但是这样又总是漏下失败的地址,而且经常会运行一段时间后卡住。
经过了几个版本的尝试,最终确定以下的一个思路:
1. 提供一个任务分配函数,由该函数提供给给定目标的任务数量
2. 启动多个进程,每个进程负责处理自己给定的目标,直到处理结束后,重新获取
3. 提供一个任务处理函数,负责处理目标任务,并返回给进程结果。
大致来说,这样兼顾的并发和多进程的处理,效率相对于之前又快了几倍,就目前来看,还是不够,主要是经常超时,需要对任务做超时处理,而且代理IP质量有点差,而且目标站点本身的抓取速度有点慢,导致整体的进度并不快。
但是并发量级又不能太大,太大了会导致目标站点访问瘫痪,导致影响站点正常用户访问。 感觉就目前这个速度,抓取完毕的话,估计还得三四年....
看来还是要做一些取舍的,数据漏一些也可以吧。
const cluster = require('cluster');
let numCpus = 3;//require('os').cpus().length / 2;
const async = require('async');
const fetch = require('./fetch2');
let getProxy = require('./ip');
let getNext = require('./job');
const singleNum = 50;
let proxyMap = {}
function forkWorker (cluster) {
const worker = cluster.fork();
//获取任务和代理
getNext(singleNum, function (arr) {
worker.send(arr);
});
worker.on('message', async (data) => {
let type = data.type, obj = data.data;
if (type == 'changeProxy') {//更换代理
worker.send(obj);
} else if (type == 'over') {
getNext(singleNum, function (arr) {
worker.send(arr)
})
} else if (type == 'left') {
//存在剩余
worker.send(obj);
} else if (type == 'left2') {
worker.send(obj);
} else if (type == 'die') {
worker.kill();
}
})
}
if (cluster.isMaster) {
//创建worker
for (let i = 0; i < numCpus;i++) {
forkWorker(cluster);
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died : ${signal}`)
})
//每隔10s检查进程
setInterval(function () {
let arr = [];
for (let id in cluster.workers) {
arr.push(cluster.workers[id].pid);
}
console.log(`当前进程数量:${arr.length} , ${arr}`)
},10 * 60 * 1000)
} else {
/**
* @params {Array} nextArr : 本次需要处理的任务数据
* @params {Object} proxy : 本地的代理数据
*/
process.on('message', async function (nextArr) {
let pid = process.pid;
if (!proxyMap[pid]) {
proxyMap[pid] = await getProxy();
}
let proxy = proxyMap[pid];
//开始工作
console.log(`${process.pid} 开始工作----`)
async.mapLimit(nextArr, nextArr.length, async function (item) {
let flag = await fetch(item, proxy);
return {
flag: flag,
item : item
};
}, async function (err, results) {
if (err) {
console.log(err);
}
let allTotal = results.length;
let sucNum = results.filter(t => {
return t.flag == 2;
});
let carr = results.filter(t => {
return t.flag != 2;
}).map(t => t.item);
console.log(`${process.pid}:结果: [总数: ${allTotal}] - [成功: ${sucNum.length}] - [失败:${carr.length}]`)
if (carr.length > 1) {
if (allTotal - carr.length < 5) {
proxyMap[pid] = await getProxy();
}
process.send({type : 'left2',data : carr })
} else {
//全部获取结束
process.send({type : 'over'})
}
})
})
}
转载请注明出处: https://chrunlee.cn/article/spider-proxy-result.html