Поиск по блогу

вторник, 14 октября 2014 г.

Читаю "Введение в асинхронное программирование и Twisted" ...чтобы понять Scrapy Request

Нашел перевод Введение в асинхронное программирование и Twisted отличного учебного пособия (с кодом на Github). Испытал серврер slowpoetry, из книги постарался запомгнить три важных понятия: асинхронная однопоточная модель, Reactor, Defferer

Суть модели Twisted (кратко своими словами)

Вспомним, как Wireshark перехватывает TCP пакеты для нескольких вкладок (страниц) открытых в браузере. Они нумеруются по мере перехвата, но все пакеты группируются по запросам (сокетам).
Очевидно, что часто одновременно открытые сокеты пересылают пакеты одновременно, поэтому Wireshark показывает нам "перемешаные" последовательности пакетов.
Отметим, что Wireshark (в случае пересылыки HTTP сообщений) еще определяет начальный и последний транспортный пакет (TCP) и ( по умолчанию) вынимает все фрагменты кода из пакетов, собирает вместе и прикрепляет собранную HTML страницу к последнему TCP пакету.

Кликая по разным кнопкам (ссылкам) на разных вкладках мы создаем асинхронные запросы, которые отправляются по разным адресам. Ответы на запросы приходят в разное время с разными здержками.
Чтобы их перехватить, нужны event listenerы , проверяющие заголовки пакетов (в бесконечном цикле) Reactorы ... и заголовки тоже можно формировать по своим шаблонам и со своими наборами фуекций обратного вызова deffers

Нескоько цитат из книги

Поскольку callback’и много используются в асинхронном программировании, и поскольку их корректное использование, как мы увидели, может быть непростым, разработчики Twisted создали абстракцию, называемую Deffered, для того, чтобы упросить программирование с использованием callback’ов. Класс Deferred определен в twisted.internet.defer.

....deferred содержит пару callback цепочек: одну для нормальных результатов, другую - для ошибочных. Вновь созданный deferred имеет пустые цепочки. Мы можем заселить цепочки, добавляя callback’и и errback’и, и затем активизировать deferred с нормальным результатом (здесь ваша поэма!) или исключением (я не смог получить поэму, и вот почему). Активизированный deferred будет вызывать либо соответсвующие callback’и, либо errback’и в порядке, в котором они были добавлены

Немного подробнее о Reactor

Книга начинается с примера, в котором мы запускаем в бесконечном цикле event listener. Если событие, на которое он должен отреагировать не происходит, то и ресурсов этот "слушатель" почти не потребяет. Этот принцип исапользует стандартный шаблон проектмрование Reactor

" ... В предыдущей главе мы выяснили, что Twisted - реализация шаблона проектирования Reactor, таким образом Twisted содержит объект, который представляет reactor или event loop, который является ядром любой Twisted программы. Первая строка нашей программы импортирует объект reactor, чтобы его можно было использовать, вторая строка запускает цикл реактора".

Далее повторим первый примеры их книги с киент-серврером slowpoetry.py

In [4]:
%load "C:\\Users\\kiss\\Documents\\GitHub_2\\twisted-intro-master\\blocking-server\\slowpoetry.py"
In []:
# This is the blocking version of the Slow Poetry Server.

import optparse, os, socket, time


def parse_args():
    usage = """usage: %prog [options] poetry-file

This is the Slow Poetry Server, blocking edition.
Run it like this:

  python slowpoetry.py <path-to-poetry-file>

If you are in the base directory of the twisted-intro package,
you could run it like this:

  python blocking-server/slowpoetry.py poetry/ecstasy.txt

to serve up John Donne's Ecstasy, which I know you want to do.
"""

    parser = optparse.OptionParser(usage)

    help = "The port to listen on. Default to a random available port."
    parser.add_option('--port', type='int', help=help)

    help = "The interface to listen on. Default is localhost."
    parser.add_option('--iface', help=help, default='localhost')

    help = "The number of seconds between sending bytes."
    parser.add_option('--delay', type='float', help=help, default=.7)

    help = "The number of bytes to send at a time."
    parser.add_option('--num-bytes', type='int', help=help, default=10)

    options, args = parser.parse_args()

    if len(args) != 1:
        parser.error('Provide exactly one poetry file.')

    poetry_file = args[0]

    if not os.path.exists(args[0]):
        parser.error('No such file: %s' % poetry_file)

    return options, poetry_file


def send_poetry(sock, poetry_file, num_bytes, delay):
    """Send some poetry slowly down the socket."""

    inputf = open(poetry_file)

    while True:
        bytes = inputf.read(num_bytes)

        if not bytes: # no more poetry :(
            sock.close()
            inputf.close()
            return

        print 'Sending %d bytes' % len(bytes)

        try:
            sock.sendall(bytes) # this is a blocking call
        except socket.error:
            sock.close()
            inputf.close()
            return

        time.sleep(delay)


def serve(listen_socket, poetry_file, num_bytes, delay):
    while True:
        sock, addr = listen_socket.accept()

        print 'Somebody at %s wants poetry!' % (addr,)

        send_poetry(sock, poetry_file, num_bytes, delay)


def main():
    options, poetry_file= parse_args()

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((options.iface, options.port or 0))

    sock.listen(5)

    print 'Serving %s on port %s.' % (poetry_file, sock.getsockname()[1])

    serve(sock, poetry_file, options.num_bytes, options.delay)


if __name__ == '__main__':
    main()

Здесь я отвлекся на проблемы с переменными окружения, но (еще ниже) пришлось таки запустить консоль и скопировать сюда результаты

In [5]:
!python blocking-server/slowpoetry.py --port 44444 poetry/ecstasy.txt
Serving poetry/ecstasy.txt on port 44444. 
# далее буду копировать результаты из консоли, поскольку этот файл в совсем другом пространстве имен... 
python: can't open file 'blocking-server/slowpoetry.py': [Errno 2] No such file or directory

Пробую поменять диск и запустить отсюда (IPython Notebook) ncat (она не прописана в переменной окружения)

In [6]:
!c:
In [7]:
!cd C:\Program Files (x86)\Nmap

Вроде бы... команды прошли, но на самом деле ...я по прежнему на диске e:

In [14]:
!C:\Program Files(x86)\Nmap>ncat.exe\ncat.exe 127.0.0.1 44444
The system cannot find the path specified.

In [12]:
!chcp 65001
!dir
Active code page: 65001
 Volume in drive E is SL-63-X86_6
 Volume Serial Number is 2E3A-7167

 Directory of E:\w8\IPython Notebooks\2014_10

01.10.2014  21:43    <DIR>          .
01.10.2014  21:43    <DIR>          ..
05.10.2014  11:42           127В 026 telnetlib.ipynb
06.10.2014  10:58    <DIR>          .ipynb_checkpoints
02.10.2014  14:29    <DIR>          sock_server
07.10.2014  15:47            51В 820 nc_socat.ipynb
05.10.2014  11:42            26В 716 putty.ipynb
04.10.2014  14:59                 0 netstat
07.10.2014  15:18            24В 827 nmap.ipynb
05.10.2014  11:42            39В 453 cmd.ipynb
05.10.2014  11:47           151В 075 telnetlib.html
05.10.2014  11:49            24В 694 putty.html
05.10.2014  11:49            48В 406 cmd.html
07.10.2014  15:48            60В 500 nc_socat.html
07.10.2014  15:38            21В 486 nmap.html
09.10.2014  13:47             2В 067 tor_newi.ipynb
09.10.2014  13:31               189 Untitled0.ipynb
09.10.2014  14:35            11В 725 Untitled1.ipynb
              14 File(s)        589В 984 bytes
               4 Dir(s)     341В 037В 056 bytes free

Хватит отвлекаться, запустим ncat из консоли и все исполнится (ниже копипаст)

In []:
C:\Program Files (x86)\Nmap>ncat.exe 127.0.0.1 44444
The Ecstasy

Where, like a pillow on a bed
         A pregnant bank swell'd up to rest
The violet's reclining head,
         Sat we two, one another's best.
Our hands were firmly cemented
         With a fast balm, which thence did spring;
Our eye-beams twisted, and did thread
         Our eyes upon one double string;
So to'intergraft our hands, as yet
         Was all the means to make us one,
An^C
C:\Program Files (x86)\Nmap>

Теперь посмотрим на блокирующего клиента (Download a piece of poetry from the given address)

In [15]:
%load C:\\Users\\kiss\\Documents\\GitHub_2\\twisted-intro-master\\blocking-client\\get-poetry.py
In []:
# This is the blocking Get Poetry Now! client.

import datetime, optparse, socket


def parse_args():
    usage = """usage: %prog [options] [hostname]:port ...

This is the Get Poetry Now! client, blocking edition.
Run it like this:

  python get-poetry.py port1 port2 port3 ...

If you are in the base directory of the twisted-intro package,
you could run it like this:

  python blocking-client/get-poetry.py 10001 10002 10003

to grab poetry from servers on ports 10001, 10002, and 10003.

Of course, there need to be servers listening on those ports
for that to work.
"""

    parser = optparse.OptionParser(usage)

    _, addresses = parser.parse_args()

    if not addresses:
        print parser.format_help()
        parser.exit()

    def parse_address(addr):
        if ':' not in addr:
            host = '127.0.0.1'
            port = addr
        else:
            host, port = addr.split(':', 1)

        if not port.isdigit():
            parser.error('Ports must be integers.')

        return host, int(port)

    return map(parse_address, addresses)


def get_poetry(address):
    """Download a piece of poetry from the given address."""

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(address)

    poem = ''

    while True:

        # This is the 'blocking' call in this synchronous program.
        # The recv() method will block for an indeterminate period
        # of time waiting for bytes to be received from the server.

        data = sock.recv(1024)

        if not data:
            sock.close()
            break

        poem += data

    return poem


def format_address(address):
    host, port = address
    return '%s:%s' % (host or '127.0.0.1', port)


def main():
    addresses = parse_args()

    elapsed = datetime.timedelta()

    for i, address in enumerate(addresses):
        addr_fmt = format_address(address)

        print 'Task %d: get poetry from: %s' % (i + 1, addr_fmt)

        start = datetime.datetime.now()

        # Each execution of 'get_poetry' corresponds to the
        # execution of one synchronous task in Figure 1 here:
        # http://krondo.com/?p=1209#figure1

        poem = get_poetry(address)

        time = datetime.datetime.now() - start

        msg = 'Task %d: got %d bytes of poetry from %s in %s'
        print  msg % (i + 1, len(poem), addr_fmt, time)

        elapsed += time

    print 'Got %d poems in %s' % (len(addresses), elapsed)


if __name__ == '__main__':
    main()

Здесь optparse - это устаревшая библиотека для настраивания управления при помощи опций командной строки... А сам сервер выдает вот это

А вот код асинхронного клиента

In [16]:
%load C:\\Users\\kiss\\Documents\\GitHub_2\\twisted-intro-master\\async-client\\get-poetry.py
In []:
# This is the asynchronous Get Poetry Now! client.

import datetime, errno, optparse, select, socket


def parse_args():
    usage = """usage: %prog [options] [hostname]:port ...

This is the Get Poetry Now! client, asynchronous edition.
Run it like this:

  python get-poetry.py port1 port2 port3 ...

If you are in the base directory of the twisted-intro package,
you could run it like this:

  python async-client/get-poetry.py 10001 10002 10003

to grab poetry from servers on ports 10001, 10002, and 10003.

Of course, there need to be servers listening on those ports
for that to work.
"""

    parser = optparse.OptionParser(usage)

    _, addresses = parser.parse_args()

    if not addresses:
        print parser.format_help()
        parser.exit()

    def parse_address(addr):
        if ':' not in addr:
            host = '127.0.0.1'
            port = addr
        else:
            host, port = addr.split(':', 1)

        if not port.isdigit():
            parser.error('Ports must be integers.')

        return host, int(port)

    return map(parse_address, addresses)


def get_poetry(sockets):
    """Download poety from all the given sockets."""

    poems = dict.fromkeys(sockets, '') # socket -> accumulated poem

    # socket -> task numbers
    sock2task = dict([(s, i + 1) for i, s in enumerate(sockets)])

    sockets = list(sockets) # make a copy

    # we go around this loop until we've gotten all the poetry
    # from all the sockets. This is the 'reactor loop'.

    while sockets:
        # this select call blocks until one or more of the
        # sockets is ready for read I/O
        rlist, _, _ = select.select(sockets, [], [])

        # rlist is the list of sockets with data ready to read

        for sock in rlist:
            data = ''

            while True:
                try:
                    new_data = sock.recv(1024)
                except socket.error, e:
                    if e.args[0] == errno.EWOULDBLOCK:
                        # this error code means we would have
                        # blocked if the socket was blocking.
                        # instead we skip to the next socket
                        break
                    raise
                else:
                    if not new_data:
                        break
                    else:
                        data += new_data

            # Each execution of this inner loop corresponds to
            # working on one asynchronous task in Figure 3 here:
            # http://krondo.com/?p=1209#figure3

            task_num = sock2task[sock]

            if not data:
                sockets.remove(sock)
                sock.close()
                print 'Task %d finished' % task_num
            else:
                addr_fmt = format_address(sock.getpeername())
                msg = 'Task %d: got %d bytes of poetry from %s'
                print  msg % (task_num, len(data), addr_fmt)

            poems[sock] += data

    return poems


def connect(address):
    """Connect to the given server and return a non-blocking socket."""

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(address)
    sock.setblocking(0)
    return sock


def format_address(address):
    host, port = address
    return '%s:%s' % (host or '127.0.0.1', port)


def main():
    addresses = parse_args()

    start = datetime.datetime.now()

    sockets = map(connect, addresses)

    poems = get_poetry(sockets)

    elapsed = datetime.datetime.now() - start

    for i, sock in enumerate(sockets):
        print 'Task %d: %d bytes of poetry' % (i + 1, len(poems[sock]))

    print 'Got %d poems in %s' % (len(addresses), elapsed)


if __name__ == '__main__':
    main()


Посты чуть ниже также могут вас заинтересовать

Комментариев нет:

Отправить комментарий