問題

我是python中的新開發.我的程式碼如下:

 import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def run(self):
        with open('ips.txt', 'r') as urls:
            for url in urls.readlines():
                req = url.strip()
                try:
                    page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                        timeout=10)
                    soup = BS(page.text)
                    # string = string.encode('ascii', 'ignore')
                    print('[32m' + req + ' - Title: ', soup.title)
                except requests.RequestException as e:
                    print('[32m' + req + ' - TimeOut!')
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()
 

我正在嘗試使程式讀取IPs.txt並打印出每個網站的標題.

它在單個執行緒中完美無缺.現在我想透過使用multiprocessing來加快它.

但由於某種原因,它只輸出相同的第5行.我是多處理的新手,並嘗試了失敗的嘗試.

螢幕顯示問題:

screen shot showing problem

我只想執行5個工作人員在多執行緒或並行中檢查IPs.txt...我只是想讓它更快.

任何提示,線索,幫助?

  最佳答案

問題

程式碼中的主要問題是每個Worker從頭開啟ips.txt並在ips.txt中找到的每個URL上工作.因此,五個工作人員一起開啟ips.txt五次並在每個URL上工作五次.

解決方案

解決這個問題的正確方法是將程式碼拆分為master和worker.您已經實現了大多數工作程式碼.讓我們現在將主部分(在if __name__ == '__main__':下)視為主部.

現在主人應該啟動五個工作人員並透過佇列(multiprocessing.Queue)向他們傳送工作。

multiprocessing.Queue類為多個生產者將資料放入其中並且多個消費者從中讀取資料而不會遇到競爭條件提供了一種方法.此類實現了所有必要的鎖定語義,以便在多處理上下文中安全地交換資料並防止種族條件.

固定碼

以下是如何根據我上面描述的內容重寫程式碼:

 import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def __init__(self, job_queue):
        super().__init__()
        self._job_queue = job_queue

    def run(self):
        while True:
            url = self._job_queue.get()
            if url is None:
                break

            req = url.strip()

            try:
                page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                    timeout=10)
                soup = BS(page.text)
                # string = string.encode('ascii', 'ignore')
                print('[32m' + req + ' - Title: ', soup.title)
            except requests.RequestException as e:
                print('[32m' + req + ' - TimeOut!')


if __name__ == '__main__':
    jobs = []
    job_queue = multiprocessing.Queue()

    for i in range(5):
        p = Worker(job_queue)
        jobs.append(p)
        p.start()

    # This is the master code that feeds URLs into queue.
    with open('ips.txt', 'r') as urls:
        for url in urls.readlines():
            job_queue.put(url)

    # Send None for each worker to check and quit.
    for j in jobs:
        job_queue.put(None)

    for j in jobs:
        j.join()
 

我們可以在上面的程式碼中看到主機開啟ips.txt一次,逐個讀取URL並將它們放入佇列中.每個工作者都等待URL到達此佇列.一旦URL到達佇列,其中一個工作者就會選擇它並且很忙.如果佇列中有更多的URL,下一個免費工作者會選擇下一個佇列等等.

最後,當所有工作完成時,我們需要一些方法讓工作者退出.有幾種方法可以實現這一點.在這個例子中,我選擇了一個簡單的策略,將五個哨點值(在這種情況下為5個None值)傳送到佇列中,每個工作者一個,以便每個工作者都可以接收並退出.

有另一種策略,工作人員和主人共享multiprocessing.Event物件,就像他們現在共享multiprocessing.Queue物件一樣.主人每當希望工作人員退出時都會呼叫此物件的set()方法.工作人員檢查這個物件是否is_set()並退出.但是,這將在程式碼中引入一些額外的複雜性.我在下面討論過這個問題.

為了完整起見,也為了展示最小、完整和可核實的例子,我在下面提出了兩個程式碼示例,顯示停止策略。

使用哨兵值來停止工人

這幾乎是我到目前為止所描述的,除了程式碼示例已經簡化很多以刪除Python標準庫之外的任何庫上的依賴項.

下面的示例中值得注意的另一個事情是,我們不是建立工作類,而是使用工作函式並從中建立一個Process.這種型別的程式碼通常在Python文件中找到,它很慣用.

 import multiprocessing
import time
import random


def worker(input_queue):
    while True:
        url = input_queue.get()

        if url is None:
            break

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.Queue()
    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(input_queue, ))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Ask the workers to quit.
    for w in workers:
        input_queue.put(None)

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()
 

使用活動停止工人

使用multiprocessing.Event物件發出訊號,當工人辭職時應該在程式碼中引入一些複雜性.主要需要做三個更改:

  • 在主機中,我們在 set() 物件上呼叫 Event 方法來表示工人應該儘快退出。
  • 在工作者中,我們定期呼叫is_set()方法的Event物件來檢查它是否應該退出.
  • 在主機中,我們需要使用multiprocessing.JoinableQueue而不是multiprocessing.Queue,以便在工人要求退出之前,它可以測試佇列是否被工人完全消耗。
  • 在工作者中,我們需要在消耗佇列中的每個專案後呼叫佇列的task_done()方法.這是主要呼叫佇列的join()方法以測試它是否已被清空的必要條件.

所有這些更改都可以在下面的程式碼中找到:

 import multiprocessing
import time
import random
import queue


def worker(input_queue, stop_event):
    while not stop_event.is_set():
        try:
            # Check if any URL has arrived in the input queue. If not,
            # loop back and try again.
            url = input_queue.get(True, 1)
            input_queue.task_done()
        except queue.Empty:
            continue

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.JoinableQueue()
    stop_event = multiprocessing.Event()

    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker,
                                    args=(input_queue, stop_event))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Wait for the queue to be consumed.
    input_queue.join()

    # Ask the workers to quit.
    stop_event.set()

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()