Многопоточность и асинхронное программирование

9. Многопоточность и асинхронное программирование #

Python предоставляет встроенные инструменты для работы с многопоточностью и асинхронным программированием. Эти подходы позволяют эффективно использовать процессорное время и упрощают выполнение нескольких задач одновременно.


9.1. Потоки и модули threading, multiprocessing #

Потоки: Модуль threading #

Многопоточность используется для выполнения нескольких задач в рамках одного процесса. Это полезно, если задачи зависят от ввода-вывода.

Создание и запуск потока #
import threading

def print_numbers():
    for i in range(5):
        print(f"Number: {i}")

# Создаем поток
thread = threading.Thread(target=print_numbers)
thread.start()  # Запускаем поток
thread.join()   # Ждем завершения потока
Использование классов #
class MyThread(threading.Thread):
    def run(self):
        for i in range(5):
            print(f"Thread says: {i}")

thread = MyThread()
thread.start()
thread.join()

Модуль multiprocessing #

Модуль multiprocessing позволяет запускать процессы, что эффективно для задач, требующих интенсивных вычислений (например, обработки больших массивов данных).

Пример использования #
from multiprocessing import Process

def compute_square(numbers):
    for n in numbers:
        print(f"Square: {n ** 2}")

numbers = [1, 2, 3, 4, 5]
process = Process(target=compute_square, args=(numbers,))
process.start()
process.join()
Передача данных между процессами #

С помощью очередей:

from multiprocessing import Process, Queue

def worker(queue):
    queue.put("Hello from process!")

queue = Queue()
process = Process(target=worker, args=(queue,))
process.start()
print(queue.get())  # Hello from process!
process.join()

9.2. Асинхронное программирование с asyncio #

Асинхронное программирование полезно, когда задачи требуют большого количества операций ввода-вывода (например, сетевые запросы).

Основные концепции #

  • async def — объявление асинхронной функции.
  • await — ожидание выполнения асинхронной операции.
  • asyncio.run() — запуск асинхронной функции.

Асинхронные функции #

import asyncio

async def say_hello():
    print("Hello!")
    await asyncio.sleep(1)  # Асинхронная пауза
    print("Goodbye!")

asyncio.run(say_hello())

Одновременное выполнение задач #

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 completed")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 completed")

async def main():
    await asyncio.gather(task1(), task2())  # Запуск задач одновременно

asyncio.run(main())

Асинхронные итераторы и контекстные менеджеры #

class AsyncCounter:
    def __init__(self, max_value):
        self.max_value = max_value
        self.current = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current < self.max_value:
            self.current += 1
            await asyncio.sleep(0.5)
            return self.current
        else:
            raise StopAsyncIteration

async def main():
    async for value in AsyncCounter(5):
        print(value)

asyncio.run(main())

9.3. Синхронизация потоков #

Когда несколько потоков или процессов работают с общими данными, требуется синхронизация для предотвращения конфликтов.

Мьютексы (Locks) #

import threading

lock = threading.Lock()
counter = 0

def increment():
    global counter
    with lock:  # Синхронизируем доступ к ресурсу
        temp = counter
        temp += 1
        counter = temp

threads = [threading.Thread(target=increment) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print(f"Counter: {counter}")

Семафоры #

semaphore = threading.Semaphore(3)  # Одновременно выполняется не более 3 потоков

def task():
    with semaphore:
        print("Task started")
        threading.sleep(1)
        print("Task completed")

threads = [threading.Thread(target=task) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

Использование очередей #

Модуль queue позволяет организовать безопасный обмен данными между потоками.

import queue
import threading

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)
        print(f"Produced: {i}")

def consumer():
    while not q.empty():
        item = q.get()
        print(f"Consumed: {item}")
        q.task_done()

prod_thread = threading.Thread(target=producer)
cons_thread = threading.Thread(target=consumer)

prod_thread.start()
prod_thread.join()

cons_thread.start()
cons_thread.join()

Рекомендации по выбору подхода #

  • Используйте threading для задач, связанных с вводом-выводом.
  • Используйте multiprocessing для задач, требующих интенсивных вычислений.
  • Для масштабируемого асинхронного кода (например, веб-серверов) применяйте asyncio.
  • Всегда учитывайте синхронизацию, если несколько потоков или процессов работают с общими ресурсами.