14. Работа с данными в реальном времени #
Работа с данными в реальном времени требует инструментов, которые позволяют поддерживать взаимодействие в реальном времени, получать и обрабатывать данные из внешних источников, а также управлять потоками информации.
14.1. WebSockets #
WebSocket — это протокол, обеспечивающий двустороннюю связь между клиентом и сервером.
Установка библиотеки websockets
#
pip install websockets
Пример: сервер на WebSocket #
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(f"Received: {message}")
await websocket.send(f"Echo: {message}")
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Пример: клиент на WebSocket #
import asyncio
import websockets
async def communicate():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello, Server!")
response = await websocket.recv()
print(f"Received from server: {response}")
asyncio.run(communicate())
14.2. Парсинг данных (BeautifulSoup, lxml) #
Парсинг используется для извлечения данных из HTML и XML-документов.
Установка #
pip install beautifulsoup4 lxml
Пример: парсинг HTML с BeautifulSoup
#
from bs4 import BeautifulSoup
html_doc = """
<html>
<head><title>Example Page</title></head>
<body>
<h1>Hello, world!</h1>
<p class="content">This is a paragraph.</p>
</body>
</html>
"""
soup = BeautifulSoup(html_doc, "lxml")
# Извлечение заголовка
title = soup.title.string
print(f"Title: {title}") # Title: Example Page
# Извлечение текста параграфа
paragraph = soup.find("p", class_="content").text
print(f"Paragraph: {paragraph}") # Paragraph: This is a paragraph.
Пример: парсинг XML с lxml
#
from lxml import etree
xml_data = """
<root>
<item id="1">First</item>
<item id="2">Second</item>
</root>
"""
tree = etree.fromstring(xml_data)
# Извлечение всех элементов
for item in tree.findall("item"):
print(f"ID: {item.get('id')}, Text: {item.text}")
14.3. Работа с потоками данных #
Работа с потоками данных позволяет обрабатывать большие объемы информации, поступающей в реальном времени, например из логов, датчиков или веб-сервисов.
Использование библиотеки asyncio
#
Асинхронные функции идеально подходят для обработки потоков данных.
import asyncio
async def data_stream():
for i in range(5):
await asyncio.sleep(1) # Симуляция поступления данных
yield f"Data chunk {i}"
async def process_stream():
async for chunk in data_stream():
print(f"Processing: {chunk}")
asyncio.run(process_stream())
Пример: обработка данных из API с использованием aiohttp
#
pip install aiohttp
import aiohttp
import asyncio
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
url = "https://jsonplaceholder.typicode.com/posts"
data = await fetch_data(session, url)
for post in data[:5]: # Обработка первых 5 записей
print(post["title"])
asyncio.run(main())
Kafka: потоковая обработка данных #
Apache Kafka — популярная система обработки потоков данных. Для работы с ней в Python используется библиотека confluent-kafka
.
Установка #
pip install confluent-kafka
Пример: потребитель Kafka #
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-consumer',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['test-topic'])
try:
while True:
msg = consumer.poll(1.0) # Ожидание данных
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
finally:
consumer.close()
Рекомендации #
- WebSockets используйте для чатов, уведомлений или игр, где требуется двусторонняя связь в реальном времени.
- Для парсинга HTML предпочтительнее
BeautifulSoup
, а для работы с XML —lxml
. - При обработке больших потоков данных обращайте внимание на асинхронные подходы (
asyncio
,aiohttp
) и системы типа Kafka.