# -*- coding: utf-8 -*-
import requests
from lxml import html
from concurrent.futures import ThreadPoolExecutor
import threading
from loguru import logger
import time

lock = threading.Lock()

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"
}
rate_dic = dict()
rate_dic["total"] = 0
rate_dic["success"] = 0
rate_dic["fail"] = 0
rate_dic["msg"] = ""


class ProxyTest(object):
    def __init__(self):
        self.session = requests.Session()
        self.session.headers = headers

    def get_response(self, url, proxies=None, timeout=5):
        if proxies is None:
            proxies = {}
        try:
            req = self.session.get(url, proxies=proxies, timeout=timeout)
            if req.status_code == 200:
                return req.text
        except Exception as  e:
            return False

    def test_ip(self, ip):
        url = "http://myip.ipip.net/"
        proxy = {"http": "http://{}".format(ip),
                 "https": "https://{}".format(ip)
                 }
        source = self.get_response(url, proxies=proxy)
        with lock:
            if source:
                rate_dic["success"] += 1
                # redis.add(value=ip, key="proxy_test_ip66")
            else:
                rate_dic["fail"] += 1
            rate_dic["msg"] = source[:100].replace("\n", "").replace("\t", "").replace("\r", "").strip() if source else ""

            rate_dic["total"] += 1
        logger.info(f"rate_dic:{rate_dic}")

    def parse_ip(self, source):
        root = html.fromstring(source)
        ip_list = root.xpath("//br")
        ip_datas = []
        for item in ip_list[:100]:
            ip = item.tail.strip()
            if ip:
                ip_datas.append(ip)
        logger.info(f"获取ip,{len(ip_datas)}条")

        with ThreadPoolExecutor(max_workers=10) as pool:
            pool.map(self.test_ip, ip_datas)

    def run(self):
        global total
        global success
        global fail
        ip_url = "http://www.xxx.cn/nmtq.php?getnum=1000&isp=0&anonymoustype=0&start=&ports=&export=&ipaddress=&area=1&proxytype=2&api=66ip"
        source = self.get_response(ip_url)
        start = time.time()
        self.parse_ip(source)
        rate = rate_dic["success"] / rate_dic["total"]
        end = time.time()
        logger.info(f"成功率:{rate:.2f},总用时:{(end - start):.2f}s")


if __name__ == \'__main__\':
    pt = ProxyTest()
    pt.run()


版权声明:本文为c-x-a原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/c-x-a/p/13162289.html