Нашел перевод Введение в асинхронное программирование и Twisted отличного учебного пособия (с кодом на Github). Испытал серврер slowpoetry, из книги постарался запомгнить три важных понятия: асинхронная однопоточная модель, Reactor, Defferer
Введение в асинхронное программирование и Twisted
What is the best tutorial for Python's Twisted framework?
TWISTED INTRODUCTION
https://github.com/jdavisp3
15.5. optparse — Parser for command line options Deprecated since version 2.7
blocking-server/slowpoetry.py
Twisted tutorial part 1
Twisted tutorial part 2
16.1. select — Waiting for I/O completion
PySocks
Суть модели Twisted (кратко своими словами)¶
Вспомним, как Wireshark перехватывает TCP пакеты для нескольких вкладок (страниц) открытых в браузере. Они нумеруются по мере перехвата, но все пакеты группируются по запросам (сокетам).
Очевидно, что часто одновременно открытые сокеты пересылают пакеты одновременно, поэтому Wireshark показывает нам "перемешаные" последовательности пакетов.
Отметим, что Wireshark (в случае пересылыки HTTP сообщений) еще определяет начальный и последний транспортный пакет (TCP) и ( по умолчанию) вынимает все фрагменты кода из пакетов, собирает вместе и прикрепляет собранную HTML страницу к последнему TCP пакету.
Кликая по разным кнопкам (ссылкам) на разных вкладках мы создаем асинхронные запросы, которые отправляются по разным адресам. Ответы на запросы приходят в разное время с разными здержками.
Чтобы их перехватить, нужны event listenerы , проверяющие заголовки пакетов (в бесконечном цикле) Reactor
ы ... и заголовки тоже можно формировать по своим шаблонам и со своими наборами фуекций обратного вызова deffers
Нескоько цитат из книги¶
Поскольку callback’и много используются в асинхронном программировании, и поскольку их корректное использование, как мы увидели, может быть непростым, разработчики Twisted создали абстракцию, называемую Deffered, для того, чтобы упросить программирование с использованием callback’ов. Класс Deferred определен в twisted.internet.defer.
....deferred содержит пару callback цепочек: одну для нормальных результатов, другую - для ошибочных. Вновь созданный deferred имеет пустые цепочки. Мы можем заселить цепочки, добавляя callback’и и errback’и, и затем активизировать deferred с нормальным результатом (здесь ваша поэма!) или исключением (я не смог получить поэму, и вот почему). Активизированный deferred будет вызывать либо соответсвующие callback’и, либо errback’и в порядке, в котором они были добавлены
Немного подробнее о Reactor¶
Книга начинается с примера, в котором мы запускаем в бесконечном цикле event listener. Если событие, на которое он должен отреагировать не происходит, то и ресурсов этот "слушатель" почти не потребяет. Этот принцип исапользует стандартный шаблон проектмрование Reactor
" ... В предыдущей главе мы выяснили, что Twisted - реализация шаблона проектирования Reactor, таким образом Twisted содержит объект, который представляет reactor или event loop, который является ядром любой Twisted программы. Первая строка нашей программы импортирует объект reactor, чтобы его можно было использовать, вторая строка запускает цикл реактора".
Далее повторим первый примеры их книги с киент-серврером slowpoetry.py¶
%load "C:\\Users\\kiss\\Documents\\GitHub_2\\twisted-intro-master\\blocking-server\\slowpoetry.py"
# This is the blocking version of the Slow Poetry Server.
import optparse, os, socket, time
def parse_args():
usage = """usage: %prog [options] poetry-file
This is the Slow Poetry Server, blocking edition.
Run it like this:
python slowpoetry.py <path-to-poetry-file>
If you are in the base directory of the twisted-intro package,
you could run it like this:
python blocking-server/slowpoetry.py poetry/ecstasy.txt
to serve up John Donne's Ecstasy, which I know you want to do.
"""
parser = optparse.OptionParser(usage)
help = "The port to listen on. Default to a random available port."
parser.add_option('--port', type='int', help=help)
help = "The interface to listen on. Default is localhost."
parser.add_option('--iface', help=help, default='localhost')
help = "The number of seconds between sending bytes."
parser.add_option('--delay', type='float', help=help, default=.7)
help = "The number of bytes to send at a time."
parser.add_option('--num-bytes', type='int', help=help, default=10)
options, args = parser.parse_args()
if len(args) != 1:
parser.error('Provide exactly one poetry file.')
poetry_file = args[0]
if not os.path.exists(args[0]):
parser.error('No such file: %s' % poetry_file)
return options, poetry_file
def send_poetry(sock, poetry_file, num_bytes, delay):
"""Send some poetry slowly down the socket."""
inputf = open(poetry_file)
while True:
bytes = inputf.read(num_bytes)
if not bytes: # no more poetry :(
sock.close()
inputf.close()
return
print 'Sending %d bytes' % len(bytes)
try:
sock.sendall(bytes) # this is a blocking call
except socket.error:
sock.close()
inputf.close()
return
time.sleep(delay)
def serve(listen_socket, poetry_file, num_bytes, delay):
while True:
sock, addr = listen_socket.accept()
print 'Somebody at %s wants poetry!' % (addr,)
send_poetry(sock, poetry_file, num_bytes, delay)
def main():
options, poetry_file= parse_args()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((options.iface, options.port or 0))
sock.listen(5)
print 'Serving %s on port %s.' % (poetry_file, sock.getsockname()[1])
serve(sock, poetry_file, options.num_bytes, options.delay)
if __name__ == '__main__':
main()
Здесь я отвлекся на проблемы с переменными окружения, но (еще ниже) пришлось таки запустить консоль и скопировать сюда результаты¶
!python blocking-server/slowpoetry.py --port 44444 poetry/ecstasy.txt
Serving poetry/ecstasy.txt on port 44444.
# далее буду копировать результаты из консоли, поскольку этот файл в совсем другом пространстве имен...
Пробую поменять диск и запустить отсюда (IPython Notebook) ncat (она не прописана в переменной окружения)¶
!c:
!cd C:\Program Files (x86)\Nmap
Вроде бы... команды прошли, но на самом деле ...я по прежнему на диске e:
!C:\Program Files(x86)\Nmap>ncat.exe\ncat.exe 127.0.0.1 44444
!chcp 65001
!dir
Хватит отвлекаться, запустим ncat из консоли и все исполнится (ниже копипаст)¶
C:\Program Files (x86)\Nmap>ncat.exe 127.0.0.1 44444
The Ecstasy
Where, like a pillow on a bed
A pregnant bank swell'd up to rest
The violet's reclining head,
Sat we two, one another's best.
Our hands were firmly cemented
With a fast balm, which thence did spring;
Our eye-beams twisted, and did thread
Our eyes upon one double string;
So to'intergraft our hands, as yet
Was all the means to make us one,
An^C
C:\Program Files (x86)\Nmap>
Теперь посмотрим на блокирующего клиента (Download a piece of poetry from the given address)¶
%load C:\\Users\\kiss\\Documents\\GitHub_2\\twisted-intro-master\\blocking-client\\get-poetry.py
# This is the blocking Get Poetry Now! client.
import datetime, optparse, socket
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, blocking edition.
Run it like this:
python get-poetry.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python blocking-client/get-poetry.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
def get_poetry(address):
"""Download a piece of poetry from the given address."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(address)
poem = ''
while True:
# This is the 'blocking' call in this synchronous program.
# The recv() method will block for an indeterminate period
# of time waiting for bytes to be received from the server.
data = sock.recv(1024)
if not data:
sock.close()
break
poem += data
return poem
def format_address(address):
host, port = address
return '%s:%s' % (host or '127.0.0.1', port)
def main():
addresses = parse_args()
elapsed = datetime.timedelta()
for i, address in enumerate(addresses):
addr_fmt = format_address(address)
print 'Task %d: get poetry from: %s' % (i + 1, addr_fmt)
start = datetime.datetime.now()
# Each execution of 'get_poetry' corresponds to the
# execution of one synchronous task in Figure 1 here:
# http://krondo.com/?p=1209#figure1
poem = get_poetry(address)
time = datetime.datetime.now() - start
msg = 'Task %d: got %d bytes of poetry from %s in %s'
print msg % (i + 1, len(poem), addr_fmt, time)
elapsed += time
print 'Got %d poems in %s' % (len(addresses), elapsed)
if __name__ == '__main__':
main()
Здесь optparse - это устаревшая библиотека для настраивания управления при помощи опций командной строки... А сам сервер выдает вот это
А вот код асинхронного клиента¶
%load C:\\Users\\kiss\\Documents\\GitHub_2\\twisted-intro-master\\async-client\\get-poetry.py
# This is the asynchronous Get Poetry Now! client.
import datetime, errno, optparse, select, socket
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, asynchronous edition.
Run it like this:
python get-poetry.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python async-client/get-poetry.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
def get_poetry(sockets):
"""Download poety from all the given sockets."""
poems = dict.fromkeys(sockets, '') # socket -> accumulated poem
# socket -> task numbers
sock2task = dict([(s, i + 1) for i, s in enumerate(sockets)])
sockets = list(sockets) # make a copy
# we go around this loop until we've gotten all the poetry
# from all the sockets. This is the 'reactor loop'.
while sockets:
# this select call blocks until one or more of the
# sockets is ready for read I/O
rlist, _, _ = select.select(sockets, [], [])
# rlist is the list of sockets with data ready to read
for sock in rlist:
data = ''
while True:
try:
new_data = sock.recv(1024)
except socket.error, e:
if e.args[0] == errno.EWOULDBLOCK:
# this error code means we would have
# blocked if the socket was blocking.
# instead we skip to the next socket
break
raise
else:
if not new_data:
break
else:
data += new_data
# Each execution of this inner loop corresponds to
# working on one asynchronous task in Figure 3 here:
# http://krondo.com/?p=1209#figure3
task_num = sock2task[sock]
if not data:
sockets.remove(sock)
sock.close()
print 'Task %d finished' % task_num
else:
addr_fmt = format_address(sock.getpeername())
msg = 'Task %d: got %d bytes of poetry from %s'
print msg % (task_num, len(data), addr_fmt)
poems[sock] += data
return poems
def connect(address):
"""Connect to the given server and return a non-blocking socket."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(address)
sock.setblocking(0)
return sock
def format_address(address):
host, port = address
return '%s:%s' % (host or '127.0.0.1', port)
def main():
addresses = parse_args()
start = datetime.datetime.now()
sockets = map(connect, addresses)
poems = get_poetry(sockets)
elapsed = datetime.datetime.now() - start
for i, sock in enumerate(sockets):
print 'Task %d: %d bytes of poetry' % (i + 1, len(poems[sock]))
print 'Got %d poems in %s' % (len(addresses), elapsed)
if __name__ == '__main__':
main()
Посты чуть ниже также могут вас заинтересовать
Комментариев нет:
Отправить комментарий