Python 多线程实例
2020-02-07
python
1409
import re from lib.com import get_request, get_content import time from configparser import ConfigParser import os import threading import queue thread_queue = queue.Queue() def check_link(url, domain): r = get_request(url) matchs = re.search(domain, r.url) return matchs is not None def check_url(content, domain, index): # 检查domain matchs = re.search(r"" + domain, content) if matchs is not None: return index content = content.replace("<b>", "") content = content.replace("</b>", "") mshowurl1 = re.search(r'class="c-showurl" style="text-decoration:none;">([^/<]+)/[^<]+?<', content, re.I | re.S) if mshowurl1 is None: pass else: if domain == mshowurl1[1]: return index else: return 0 mshowurl2 = re.search(r'class="c-showurl" style="text-decoration:none;">https://([^/]+)/[^<]+?<', content, re.I | re.S) if mshowurl2 is None: pass else: if domain == mshowurl2[1]: return index else: return 0 matchs = re.search(r'href="([^\"\']+)" class="c-showurl" style="text-decoration:none;">', content) if matchs is not None: if check_link(matchs[1], domain): return index return 0 def rank(domain, page, keyword): pn = 10 * (page - 1) url = "https://www.baidu.com/s?wd={0}&pn={1}&oq={2}&ie=utf-8&rsv_pq=&rsv_t=".format(keyword, pn, keyword) content = get_content(url) index = 0 lst = [] try: for x in re.finditer(r'<h3 class="t">(.+?)百度快照</a>', content, re.I | re.S): index += 1 _index = check_url(x[0], domain, index) if _index > 0: lst.append(str(_index)) except Exception as e: print("Error:", str(e)) return lst def baidu_rank(domain, keyword): rank_list = [] for page in range(1, 11): lst = rank(domain, page, keyword) total = len(lst) if total == 0: pass else: lst_str = str(page) + ":" + ",".join(lst) + ":" + domain + ":" + keyword rank_list.append(lst_str) with open(domain + ".txt", "a", encoding="utf-8") as f: f.write(lst_str + "\n") if total > 0: break print("%s : %s count:%s" % (page, lst, total)) print(rank_list) class RankThread(threading.Thread): def __init__(self, thread_name): threading.Thread.__init__(self) self.thread_name = thread_name def run(self): global thread_queue print("线程 %s : 开始" % self.thread_name) while thread_queue.empty() is False: domain, keyword = thread_queue.get() print("查询 %s : %s" % (domain, keyword)) # time.sleep(random.randrange(1, 5)) baidu_rank(domain, keyword) print("线程 %s : 结束" % self.thread_name) def main(): global thread_queue thread_limit = 10 g_start = time.time() cf = ConfigParser() cf.read("conf.ini", encoding="utf-8") domain = cf.get("settings", "domain") keywords = cf.get("settings", "keywords") keywords = keywords.replace("|", ",") ranktxt = domain + ".txt" if os.path.exists(ranktxt): os.remove(ranktxt) # 多线程 处理 for keyword in keywords.split(","): thread_queue.put((domain, keyword)) print("即将开启 %d 个 线程执行该任务" % thread_limit) threads = [] for index in range(thread_limit): mythread = RankThread("查询线程:%d" % index) threads.append(mythread) for mythread in threads: mythread.setDaemon(True) mythread.start() for mythread in threads: mythread.join() print("查询耗时:%s 秒" % (round(time.time() - g_start, 2))) if __name__ == '__main__': main()
很赞哦! (0)
相关文章
文章评论
-
-
-
0条评论