czwartek, 1 stycznia 2009

Revactor - współbieżność bez wątków

Nie od dziś wiadomo, że implementacja wątków (green threads) w Rubym 1.8 nie pozwala tak naprawdę na osiągnięcie szczególnie spektakularnych efektów, jeśli chodzi o wydajność programów wielowątkowych (napisano już na ten temat wiele, np. tu i nie zamierzam się nad tym rozwodzić).
Dość powiedzieć, że najwydajniejsze serwery sieciowe pisane w Rubym, są oparte o tzw. pętlę zdarzeń (event loop) oraz wzorzec reactor, a nie o wielowątkowość. Najpopularniejszą biblioteką, opartą o wzorzec reactor, jest obecnie eventmachine (przynajmniej dla Ruby 1.8).

Ruby 1.9 wychodzi nieco na przeciw tym problemom, dodając obsługę wątków systemu operacyjnego oraz klasę Fiber. O "włóknach" w Rubym i sposobie ich użycia również pisano dużo - można powiedzieć, że w kontekście programowania współbieżnego włókna to swego rodzaju lekkie wątki, których działanie nie jest przerywane (wywłaszczane) przez zewnętrzny proces/scheduler (czy to systemu operacyjnego, czy to, jak w przypadku "zielonych wątków", interpretera Ruby), a musi być zawieszane (Fiber.yield) jawnie przez programistę w celu umożliwienia działania innych włókien. Klasa Fiber pozwala de facto na tworzenie własnych konstrukcji współbieżnych.

Na możliwościach, jakie Ruby 1.9 daje na tym polu, oraz w oparciu o pętlę zdarzeń Rev (dodatkowo doprawionych zaczerpniętym z języka Erlang modelem współbieżności), oparto ciekawy framework Revactor. Nie ukrywam, że właśnie model współbieżności lansowany przez Erlang (actor model) przemawia do mnie najbardziej (zapewne do autora omawianego framework'a również), o czym pisałem również w poprzednich postach.

Filozofia współbieżności została żywcem przeniesiona z języka Erlang. A zatem:

  • podstawowym bytem reprezentującym pojedyncze zadanie, wykonujące się współbieżnie z innymi zadaniami, jest aktor (odpowiednik procesu w Erlang'u)

  • stan całego przetwarzania reprezentowany jest przez stan poszczególnych aktorów biorących w nim udział

  • stan danego aktora nie jest współdzielony z innymi aktorami

  • jedynym sposobem wymiany danych pomiędzy aktorami jest wysyłanie i odbieranie komunikatów; w tym celu każdy aktor posiada własną "skrzynkę odbiorczą" (mailbox)

Oczywiście ostatni punkt jest czysto umowny - w języku takim jak Ruby, który nie posiada zmiennych jednokrotnego przypisania, nikt nie może zagwarantować, że aktorzy nie będą odwoływać się do pamięci współdzielonej (i w związku z tym nie może dać głowy, że program nie jest pozbawiony efektów ubocznych - jeśli nawet ufamy swojemu kodowi, to i tak nie można mieć pewności, że używany kod autorstwa kogoś trzeciego jest "bezpieczny wątkowo").


Jeśli chodzi o możliwości, to Revactor daje nam prawie wszystko to co Erlang. Porównanie wybranych konstrukcji zebrałem w poniższej tabelce.



FunkcjonalnośćRevactorErlang
Tworzenie aktora/procesu
actor = Actor.spawn do
...
end
Pid = spawn(fun() -> ... end)
Bieżący aktor/proces
Actor.current
self()
Wysłanie komunikatu
actor << message
Pid ! Message
Odebranie komunikatu
Actor.receive do |filter|
filter.when(pattern1) { ... }
filter.when(pattern2) { ... }
...
end
receive
pattern1 -> ...;
pattern2 -> ...;
...
end
Odebranie komunikatu z timeout'em
Actor.receive do |filter|
...
filter.after(time) { ... }
end
receive
...
after Time -> ...
end
Wiązanie aktorów/procesów (link)
actor = Actor.spawn { ... }
Actor.link(actor)
lub
Actor.spawn_link { ... }
Pid = spawn(...),
link(Pid)
lub
spawn_link(...)
Przechwytywanie wyjątków z połączonych aktorów/procesów
Actor.current.trap_exit = true
actor = Actor.spawn_link { ... }
Actor.receive do |filter|
filter.when(
Case[:exit,actor,Object])
do |reason|
...
end
end
process_flag(trap_exit, true),
Pid = spawn_link(...),
receive
{'EXIT', Pid, Reason} -> ...
end
Rejestracja procesów
-
register(Name, Pid)
unregister(Name)

Ale to nie wszystko. Generalnie Revactor powstał jako framework do budowania aplikacji sieciowych. Dlatego umożliwia również asynchroniczny sposób wykonywania operacji we/wy.
Normalnie, każda operacja i/o w Rubym jest blokująca, tzn. wywołanie metody (np. IO#read) kończy się dopiero wtedy, gdy otrzymała/wysłała dane (np. odczytała porcję bajtów z socket'a).
Przeważnie jednak oczekiwanie na dane nie zajmuje całej mocy procesora dostępnej w danej chwili dla procesu interpretera, więc cpu w tym czasie pozostaje niewykorzystany. Najgorsze jednak jest to, że interpreter Ruby'ego, wykonując kod zewnętrzny (wywołania systemowe również się tutaj zaliczają), blokuje wykonywanie wszystkich "zielonych wątków" należących do procesu interpretera. Jak widać przy współbieżności Ruby'ego z użyciem green threads słusznie stawia się już od dawna duży znak zapytania, o czym wspomniałem na początku posta.


Na szczęście biblioteka Rev pozwala na nieblokujące wykonywanie operacji i/o. Revactor udostępnia własną implementację standardowej klasy TCPSocket. Okazuje się, że wywołanie IO#read ...blokuje, dopóki porcja danych z socket'a nie zostanie odczytana. Blokowanie jest jednak tylko pozorne, a właściwie rzecz ujmując - tylko bieżący Aktor zostaje "zablokowany". Pod spodem, Revactor wykonuje zawieszenie wykonywanego włókna (Fiber.yield), pozwalając pętli zdarzeń na przetworzenie kolejnych komunikatów - w tym czasie mogą wykonywać się inne włókna (a więc inni Aktorzy) oraz może odbywać się komunikacja pomiędzy nimi - aż do momentu, w którym nie zajdzie odpowiednie zdarzenie i/o, pozwalające na wzbudzenie (Fiber.resume) zawieszonego Aktora i kontynuowanie jego przetwarzania.
Dodatkowym plusem jest zgodność z interfejsem standardowej klasy IO, co pozwala na w miarę przezroczystą podmianę używanych socketów na wersje "nieblokujące".


Pora na przykład. Tym razem będzie to proste zliczanie wystąpień wyrazów na wybranych stronach (w tym przypadku zawierających dokumentację klas na www.ruby-doc.org), oparte na algorytmie MapReduce. Może niezbyt praktyczne, ale chodzi tylko o pokazanie sposobu wykorzystania aktorów. Na początek klasa bazowa:

#! /usr/bin/ruby19

class MapReduce
attr_reader :input, :output
attr_accessor :map, :reduce

def initialize(map, reduce)
@map, @reduce = map, reduce
end

def run(input)
raise "Override this method in child classes"
end

end
Następnie implementacja z wykorzystaniem aktorów:
#! /usr/bin/ruby19

require 'revactor'
require 'mapreduce'

class Concurrent < MapReduce
def run(input)
@input = input
current = Actor.current
Actor.spawn { do_reduce(current) }
Actor.receive do |filter|
filter.when(T[current, Object]) {|_, obj| @output = obj}
end
end

#######
private
#######
def do_reduce(parent)
reduce_actor = Actor.current
reduce_actor.trap_exit = true
@input.each do |element|
Actor.spawn_link { @map.call(reduce_actor, element) }
end
dictionary = collect_partials
result = {}
dictionary.each do |key, value|
result[key] = @reduce.call(key, value)[1]
end
parent << [parent, result]
end

def collect_partials
n = @input.size
dictionary = {}
while n > 0
Actor.receive do |filter|
filter.when(Case[:exit, Actor, Object]) { n -= 1 }
filter.when(T[String, Fixnum]) do |key, value|
dictionary[key] ||= []
dictionary[key] << value
end
end
end
dictionary
end
end
Metoda run tworzy aktora, który będzie się zajmował redukcją danych otrzymanych od innych aktorów. Dla każdego elementu wejściowej listy tworzymy nowego aktora, który wykonuje metodę map na danym elemencie. Następnie zbieramy odpowiedzi od poszczególnych aktorów (collect_partials) i po zebraniu wszystkich odpowiedzi, dokonujemy redukcji otrzymanych danych (reduce). Jak widać jest to pewien szablon - na tyle ogólny, że można go wykorzystać do realizacji różnego rodzaju zadań.
Warto zwrócić uwagę na mechanizm łączenia ze sobą aktorów (link). Wywołanie Actor.spawn_link tworzy nowego aktora, powiązanego z bieżacym. Takie powiązanie oznacza, że w przypadku "śmierci" jednego z aktorów, pozostali, powiązani z nim aktorzy - również "umierają". Dzieje się tak zarówno w przypadku, gdy aktor rzuci wyjątek, jak i wtedy, gdy się poprawnie zakończy. Jest jednak możliwość, by sygnał o śmierci jednego z aktorów nie zabił wybranego przez nas aktora - wystarczy ustawić jego atrybut trap_exit na true. Wtedy ten wybrany aktor, w przypadku śmierci innego powiązanego aktora, otrzymuje komunikat w postaci listy [:exit, actor, object], gdzie actor to aktor, który rzucił wyjątkiem bądź zakończył działanie, a object zawiera sam obiekt wyjątku lub nil w przypadku normalnego zakończenia. Umożliwia to tworzenie hierarchii aktorów, w której wybrani aktorzy są "zarządcami", tzn. mogą decydować o podjęciu jakiegoś działania w przypadku śmierci aktorów-pracowników, np. o utworzeniu nowego aktora. Taki model jest w 100% zgodny z tym, co daje nam Erlang.
Dla kontrastu implementacja działająca sekwencyjnie (co prawda nie można wtedy nazwać jej MapReduce, ale umieściłem ją dla porównania):
#! /usr/bin/ruby19

class Sequential < MapReduce
def run(input)
@input = input
dictionary = {}
@input.each do |element|
partial = []
@map.call(partial, element)
partial.each do |key, value|
dictionary[key] ||= []
dictionary[key] << value
end
end
result = {}
dictionary.each do |key, value|
result[key] = @reduce.call(key, value)[1]
end
@output = result
end
end
I na koniec sama klasa testowa zliczająca ilość wystąpień wyrazów. Każdy aktor, realizujący operację map wysyła do aktora redukującego parę [w, 1], gdzie w jest każdym napotkanym na stronie wyrazem. Następnie aktor zajmujący się redukcją wyników wyznacza ilość wystąpień danego wyrazu na podstawie komunikatów częściowych.
#! /usr/bin/ruby19

resources = []
ObjectSpace.each_object(Class) do |klass|
resources << "http://www.ruby-doc.org/core-1.9/classes/#{klass.name.gsub('::','/')}.html"
end

require 'concurrent'
require 'sequential'
require 'benchmark'
require 'cgi'
require 'uri'

include Benchmark

class WordCount
def initialize(method)
@method = method
end

def run(input)
@method.run(input)
end

def output
@method.output
end

def self.each_word(uri)
words = []
uri = URI.parse(uri)

sock = Revactor::TCP.connect(uri.host, 80)
sock.write [ "GET #{uri.path} HTTP/1.0", "Host: #{CGI.escape(uri.host)}",
"\r\n" ].join("\r\n")
begin
loop do
sock.read.scan(/[a-z]+/i) {|w| yield w}
end
rescue EOFError
end
end
end

if $0 == __FILE__
resources = resources[0...(ARGV[0].to_i)]
map = -> target, uri do
WordCount.each_word(uri) {|w| target << T[w, 1]}
end
reduce = -> key, values { T[key, values.size] }

test_seq = WordCount.new(Sequential.new(map, reduce))
test_actor = WordCount.new(Concurrent.new(map, reduce))
Benchmark.bm(12) do |b|
b.report("sequential") { test_seq.run(resources) }
b.report("#{resources.size} actors") { test_actor.run(resources) }
end

%w(test_seq test_actor).each do |t|
File.open(t, "w") do |f|
f.puts eval(t).output
end
end
end

Wynik odpalenia testu dla 50 aktorów:
                  user     system      total        real
sequential 0.890000 0.100000 0.990000 ( 44.523738)
50 actors 3.060000 0.150000 3.210000 ( 10.127858)

A więc ponad 4 razy szybciej na korzyść implementacji współbieżnej (bez wątków!). Dla porównania jeszcze wyniki dla 100 i 200 aktorów. Dla 200 nie jest aż tak dobrze, ale jest nadal nieco ponad 2 razy szybciej. W tym przypadku kluczowe jest to, że operacja odczytu danych z socket'a trwa stosunkowo długo - ten czas jest wykorzystywany na współbieżne działanie aktorów. Jasne jest, że w przypadku, gdy operacja i/o jest krótkotrwała, program działający sekwencyjnie okaże się szybszy.
                  user     system      total        real
sequential 1.010000 0.120000 1.130000 ( 65.234548)
100 actors 3.610000 0.090000 3.700000 ( 15.162739)

sequential 1.890000 0.230000 2.120000 (125.002007)
200 actors 8.170000 0.410000 8.580000 ( 56.959498)

Ruby/Revactor nie może się równać wydajnością z Erlangiem, w którym utworzenie wielkiej ilości procesów wymieniających masę komunikatów nie stanowi dużego problemu. Nic również nie zdziała, jeśli chodzi o wykorzystanie wielu procesorów (lub procesorów wielordzeniowych). Nadal poruszamy się w kontekście jednego procesu systemu operacyjnego. Ale i tak jest ciekawą alternatywą dla współbieżności opartej na wątkach. Poza tym nic nie stoi na przeszkodzie, by pisać programy działające w oparciu o wiele procesów (w którym każdy z procesów może korzystać ze współbieżności opartej o włókna lub green threads), które komunikują się między sobą z użyciem jednego z wielu dostępnych obecnie mechanizmów wymiany komunikatów (kolejek). Można również sięgnąć po JRuby, który na dzień dzisiejszy na tyle okrzepł, że wydaje się poważną alternatywą dla MRI.

Ale o tym może innym razem.

update: kod jest dostępny na githubie

2 komentarze:

radarek pisze...

Bardzo ciekawy i konkretny wpis. Powinieneś pisać częściej :).

Pi0tr pisze...

Dzięki Radarek - staram się, ale czasu brak ;)