Python 多线程实例
2020-02-07 python 2423
import re
from lib.com import get_request, get_content
import time
from configparser import ConfigParser
import os
import threading
import queue
thread_queue = queue.Queue()
def check_link(url, domain):
r = get_request(url)
matchs = re.search(domain, r.url)
return matchs is not None
def check_url(content, domain, index):
# 检查domain
matchs = re.search(r"" + domain, content)
if matchs is not None:
return index
content = content.replace("<b>", "")
content = content.replace("</b>", "")
mshowurl1 = re.search(r'class="c-showurl" style="text-decoration:none;">([^/<]+)/[^<]+?<', content, re.I | re.S)
if mshowurl1 is None:
pass
else:
if domain == mshowurl1[1]:
return index
else:
return 0
mshowurl2 = re.search(r'class="c-showurl" style="text-decoration:none;">https://([^/]+)/[^<]+?<', content,
re.I | re.S)
if mshowurl2 is None:
pass
else:
if domain == mshowurl2[1]:
return index
else:
return 0
matchs = re.search(r'href="([^\"\']+)" class="c-showurl" style="text-decoration:none;">', content)
if matchs is not None:
if check_link(matchs[1], domain):
return index
return 0
def rank(domain, page, keyword):
pn = 10 * (page - 1)
url = "https://www.baidu.com/s?wd={0}&pn={1}&oq={2}&ie=utf-8&rsv_pq=&rsv_t=".format(keyword, pn, keyword)
content = get_content(url)
index = 0
lst = []
try:
for x in re.finditer(r'<h3 class="t">(.+?)百度快照</a>', content, re.I | re.S):
index += 1
_index = check_url(x[0], domain, index)
if _index > 0:
lst.append(str(_index))
except Exception as e:
print("Error:", str(e))
return lst
def baidu_rank(domain, keyword):
rank_list = []
for page in range(1, 11):
lst = rank(domain, page, keyword)
total = len(lst)
if total == 0:
pass
else:
lst_str = str(page) + ":" + ",".join(lst) + ":" + domain + ":" + keyword
rank_list.append(lst_str)
with open(domain + ".txt", "a", encoding="utf-8") as f:
f.write(lst_str + "\n")
if total > 0:
break
print("%s : %s count:%s" % (page, lst, total))
print(rank_list)
class RankThread(threading.Thread):
def __init__(self, thread_name):
threading.Thread.__init__(self)
self.thread_name = thread_name
def run(self):
global thread_queue
print("线程 %s : 开始" % self.thread_name)
while thread_queue.empty() is False:
domain, keyword = thread_queue.get()
print("查询 %s : %s" % (domain, keyword))
# time.sleep(random.randrange(1, 5))
baidu_rank(domain, keyword)
print("线程 %s : 结束" % self.thread_name)
def main():
global thread_queue
thread_limit = 10
g_start = time.time()
cf = ConfigParser()
cf.read("conf.ini", encoding="utf-8")
domain = cf.get("settings", "domain")
keywords = cf.get("settings", "keywords")
keywords = keywords.replace("|", ",")
ranktxt = domain + ".txt"
if os.path.exists(ranktxt):
os.remove(ranktxt)
# 多线程 处理
for keyword in keywords.split(","):
thread_queue.put((domain, keyword))
print("即将开启 %d 个 线程执行该任务" % thread_limit)
threads = []
for index in range(thread_limit):
mythread = RankThread("查询线程:%d" % index)
threads.append(mythread)
for mythread in threads:
mythread.setDaemon(True)
mythread.start()
for mythread in threads:
mythread.join()
print("查询耗时:%s 秒" % (round(time.time() - g_start, 2)))
if __name__ == '__main__':
main() 很赞哦! (0)
相关文章
文章评论
-
-
-
0条评论