Nie od dziś wiadomo, że implementacja wątków (green threads) w Rubym 1.8 nie pozwala tak naprawdę na osiągnięcie szczególnie spektakularnych efektów, jeśli chodzi o wydajność programów wielowątkowych (napisano już na ten temat wiele, np. tu i nie zamierzam się nad tym rozwodzić).
Dość powiedzieć, że najwydajniejsze serwery sieciowe pisane w Rubym, są oparte o tzw. pętlę zdarzeń (event loop) oraz wzorzec reactor, a nie o wielowątkowość. Najpopularniejszą biblioteką, opartą o wzorzec reactor, jest obecnie eventmachine (przynajmniej dla Ruby 1.8).
Ruby 1.9 wychodzi nieco na przeciw tym problemom, dodając obsługę wątków systemu operacyjnego oraz klasę Fiber. O "włóknach" w Rubym i sposobie ich użycia również pisano dużo - można powiedzieć, że w kontekście programowania współbieżnego włókna to swego rodzaju lekkie wątki, których działanie nie jest przerywane (wywłaszczane) przez zewnętrzny proces/scheduler (czy to systemu operacyjnego, czy to, jak w przypadku "zielonych wątków", interpretera Ruby), a musi być zawieszane (Fiber.yield) jawnie przez programistę w celu umożliwienia działania innych włókien. Klasa Fiber pozwala de facto na tworzenie własnych konstrukcji współbieżnych.
Na możliwościach, jakie Ruby 1.9 daje na tym polu, oraz w oparciu o pętlę zdarzeń Rev (dodatkowo doprawionych zaczerpniętym z języka Erlang modelem współbieżności), oparto ciekawy framework Revactor. Nie ukrywam, że właśnie model współbieżności lansowany przez Erlang (actor model) przemawia do mnie najbardziej (zapewne do autora omawianego framework'a również), o czym pisałem również w poprzednich postach.
Filozofia współbieżności została żywcem przeniesiona z języka Erlang. A zatem:
- podstawowym bytem reprezentującym pojedyncze zadanie, wykonujące się współbieżnie z innymi zadaniami, jest aktor (odpowiednik procesu w Erlang'u)
- stan całego przetwarzania reprezentowany jest przez stan poszczególnych aktorów biorących w nim udział
- stan danego aktora nie jest współdzielony z innymi aktorami
- jedynym sposobem wymiany danych pomiędzy aktorami jest wysyłanie i odbieranie komunikatów; w tym celu każdy aktor posiada własną "skrzynkę odbiorczą" (mailbox)
Oczywiście ostatni punkt jest czysto umowny - w języku takim jak Ruby, który nie posiada zmiennych jednokrotnego przypisania, nikt nie może zagwarantować, że aktorzy nie będą odwoływać się do pamięci współdzielonej (i w związku z tym nie może dać głowy, że program nie jest pozbawiony efektów ubocznych - jeśli nawet ufamy swojemu kodowi, to i tak nie można mieć pewności, że używany kod autorstwa kogoś trzeciego jest "bezpieczny wątkowo").
Jeśli chodzi o możliwości, to Revactor daje nam prawie wszystko to co Erlang. Porównanie wybranych konstrukcji zebrałem w poniższej tabelce.
| Funkcjonalność | Revactor | Erlang |
| Tworzenie aktora/procesu | actor = Actor.spawn do | Pid = spawn(fun() -> ... end) |
| Bieżący aktor/proces | Actor.current | self() |
| Wysłanie komunikatu | actor << message | Pid ! Message |
| Odebranie komunikatu | Actor.receive do |filter| | receive |
| Odebranie komunikatu z timeout'em | Actor.receive do |filter| | receive |
| Wiązanie aktorów/procesów (link) | actor = Actor.spawn { ... } | Pid = spawn(...), |
| Przechwytywanie wyjątków z połączonych aktorów/procesów | Actor.current.trap_exit = true | process_flag(trap_exit, true), |
| Rejestracja procesów | - | register(Name, Pid) |
Ale to nie wszystko. Generalnie Revactor powstał jako framework do budowania aplikacji sieciowych. Dlatego umożliwia również asynchroniczny sposób wykonywania operacji we/wy.
Normalnie, każda operacja i/o w Rubym jest blokująca, tzn. wywołanie metody (np.
IO#read) kończy się dopiero wtedy, gdy otrzymała/wysłała dane (np. odczytała porcję bajtów z socket'a).Przeważnie jednak oczekiwanie na dane nie zajmuje całej mocy procesora dostępnej w danej chwili dla procesu interpretera, więc cpu w tym czasie pozostaje niewykorzystany. Najgorsze jednak jest to, że interpreter Ruby'ego, wykonując kod zewnętrzny (wywołania systemowe również się tutaj zaliczają), blokuje wykonywanie wszystkich "zielonych wątków" należących do procesu interpretera. Jak widać przy współbieżności Ruby'ego z użyciem green threads słusznie stawia się już od dawna duży znak zapytania, o czym wspomniałem na początku posta.
Na szczęście biblioteka Rev pozwala na nieblokujące wykonywanie operacji i/o. Revactor udostępnia własną implementację standardowej klasy TCPSocket. Okazuje się, że wywołanie IO#read ...blokuje, dopóki porcja danych z socket'a nie zostanie odczytana. Blokowanie jest jednak tylko pozorne, a właściwie rzecz ujmując - tylko bieżący Aktor zostaje "zablokowany". Pod spodem, Revactor wykonuje zawieszenie wykonywanego włókna (Fiber.yield), pozwalając pętli zdarzeń na przetworzenie kolejnych komunikatów - w tym czasie mogą wykonywać się inne włókna (a więc inni Aktorzy) oraz może odbywać się komunikacja pomiędzy nimi - aż do momentu, w którym nie zajdzie odpowiednie zdarzenie i/o, pozwalające na wzbudzenie (Fiber.resume) zawieszonego Aktora i kontynuowanie jego przetwarzania.
Dodatkowym plusem jest zgodność z interfejsem standardowej klasy IO, co pozwala na w miarę przezroczystą podmianę używanych socketów na wersje "nieblokujące".
Pora na przykład. Tym razem będzie to proste zliczanie wystąpień wyrazów na wybranych stronach (w tym przypadku zawierających dokumentację klas na www.ruby-doc.org), oparte na algorytmie MapReduce. Może niezbyt praktyczne, ale chodzi tylko o pokazanie sposobu wykorzystania aktorów. Na początek klasa bazowa:
#! /usr/bin/ruby19Następnie implementacja z wykorzystaniem aktorów:
class MapReduce
attr_reader :input, :output
attr_accessor :map, :reduce
def initialize(map, reduce)
@map, @reduce = map, reduce
end
def run(input)
raise "Override this method in child classes"
end
end
#! /usr/bin/ruby19Metoda run tworzy aktora, który będzie się zajmował redukcją danych otrzymanych od innych aktorów. Dla każdego elementu wejściowej listy tworzymy nowego aktora, który wykonuje metodę
require 'revactor'
require 'mapreduce'
class Concurrent < MapReduce
def run(input)
@input = input
current = Actor.current
Actor.spawn { do_reduce(current) }
Actor.receive do |filter|
filter.when(T[current, Object]) {|_, obj| @output = obj}
end
end
#######
private
#######
def do_reduce(parent)
reduce_actor = Actor.current
reduce_actor.trap_exit = true
@input.each do |element|
Actor.spawn_link { @map.call(reduce_actor, element) }
end
dictionary = collect_partials
result = {}
dictionary.each do |key, value|
result[key] = @reduce.call(key, value)[1]
end
parent << [parent, result]
end
def collect_partials
n = @input.size
dictionary = {}
while n > 0
Actor.receive do |filter|
filter.when(Case[:exit, Actor, Object]) { n -= 1 }
filter.when(T[String, Fixnum]) do |key, value|
dictionary[key] ||= []
dictionary[key] << value
end
end
end
dictionary
end
end
map na danym elemencie. Następnie zbieramy odpowiedzi od poszczególnych aktorów (collect_partials) i po zebraniu wszystkich odpowiedzi, dokonujemy redukcji otrzymanych danych (reduce). Jak widać jest to pewien szablon - na tyle ogólny, że można go wykorzystać do realizacji różnego rodzaju zadań.Warto zwrócić uwagę na mechanizm łączenia ze sobą aktorów (link). Wywołanie
Actor.spawn_link tworzy nowego aktora, powiązanego z bieżacym. Takie powiązanie oznacza, że w przypadku "śmierci" jednego z aktorów, pozostali, powiązani z nim aktorzy - również "umierają". Dzieje się tak zarówno w przypadku, gdy aktor rzuci wyjątek, jak i wtedy, gdy się poprawnie zakończy. Jest jednak możliwość, by sygnał o śmierci jednego z aktorów nie zabił wybranego przez nas aktora - wystarczy ustawić jego atrybut trap_exit na true. Wtedy ten wybrany aktor, w przypadku śmierci innego powiązanego aktora, otrzymuje komunikat w postaci listy [:exit, actor, object], gdzie actor to aktor, który rzucił wyjątkiem bądź zakończył działanie, a object zawiera sam obiekt wyjątku lub nil w przypadku normalnego zakończenia. Umożliwia to tworzenie hierarchii aktorów, w której wybrani aktorzy są "zarządcami", tzn. mogą decydować o podjęciu jakiegoś działania w przypadku śmierci aktorów-pracowników, np. o utworzeniu nowego aktora. Taki model jest w 100% zgodny z tym, co daje nam Erlang.Dla kontrastu implementacja działająca sekwencyjnie (co prawda nie można wtedy nazwać jej MapReduce, ale umieściłem ją dla porównania):
#! /usr/bin/ruby19I na koniec sama klasa testowa zliczająca ilość wystąpień wyrazów. Każdy aktor, realizujący operację map wysyła do aktora redukującego parę
class Sequential < MapReduce
def run(input)
@input = input
dictionary = {}
@input.each do |element|
partial = []
@map.call(partial, element)
partial.each do |key, value|
dictionary[key] ||= []
dictionary[key] << value
end
end
result = {}
dictionary.each do |key, value|
result[key] = @reduce.call(key, value)[1]
end
@output = result
end
end
[w, 1], gdzie w jest każdym napotkanym na stronie wyrazem. Następnie aktor zajmujący się redukcją wyników wyznacza ilość wystąpień danego wyrazu na podstawie komunikatów częściowych.#! /usr/bin/ruby19
resources = []
ObjectSpace.each_object(Class) do |klass|
resources << "http://www.ruby-doc.org/core-1.9/classes/#{klass.name.gsub('::','/')}.html"
end
require 'concurrent'
require 'sequential'
require 'benchmark'
require 'cgi'
require 'uri'
include Benchmark
class WordCount
def initialize(method)
@method = method
end
def run(input)
@method.run(input)
end
def output
@method.output
end
def self.each_word(uri)
words = []
uri = URI.parse(uri)
sock = Revactor::TCP.connect(uri.host, 80)
sock.write [ "GET #{uri.path} HTTP/1.0", "Host: #{CGI.escape(uri.host)}",
"\r\n" ].join("\r\n")
begin
loop do
sock.read.scan(/[a-z]+/i) {|w| yield w}
end
rescue EOFError
end
end
end
if $0 == __FILE__
resources = resources[0...(ARGV[0].to_i)]
map = -> target, uri do
WordCount.each_word(uri) {|w| target << T[w, 1]}
end
reduce = -> key, values { T[key, values.size] }
test_seq = WordCount.new(Sequential.new(map, reduce))
test_actor = WordCount.new(Concurrent.new(map, reduce))
Benchmark.bm(12) do |b|
b.report("sequential") { test_seq.run(resources) }
b.report("#{resources.size} actors") { test_actor.run(resources) }
end
%w(test_seq test_actor).each do |t|
File.open(t, "w") do |f|
f.puts eval(t).output
end
end
end
Wynik odpalenia testu dla 50 aktorów:
user system total real
sequential 0.890000 0.100000 0.990000 ( 44.523738)
50 actors 3.060000 0.150000 3.210000 ( 10.127858)
A więc ponad 4 razy szybciej na korzyść implementacji współbieżnej (bez wątków!). Dla porównania jeszcze wyniki dla 100 i 200 aktorów. Dla 200 nie jest aż tak dobrze, ale jest nadal nieco ponad 2 razy szybciej. W tym przypadku kluczowe jest to, że operacja odczytu danych z socket'a trwa stosunkowo długo - ten czas jest wykorzystywany na współbieżne działanie aktorów. Jasne jest, że w przypadku, gdy operacja i/o jest krótkotrwała, program działający sekwencyjnie okaże się szybszy.
user system total real
sequential 1.010000 0.120000 1.130000 ( 65.234548)
100 actors 3.610000 0.090000 3.700000 ( 15.162739)
sequential 1.890000 0.230000 2.120000 (125.002007)
200 actors 8.170000 0.410000 8.580000 ( 56.959498)
Ruby/Revactor nie może się równać wydajnością z Erlangiem, w którym utworzenie wielkiej ilości procesów wymieniających masę komunikatów nie stanowi dużego problemu. Nic również nie zdziała, jeśli chodzi o wykorzystanie wielu procesorów (lub procesorów wielordzeniowych). Nadal poruszamy się w kontekście jednego procesu systemu operacyjnego. Ale i tak jest ciekawą alternatywą dla współbieżności opartej na wątkach. Poza tym nic nie stoi na przeszkodzie, by pisać programy działające w oparciu o wiele procesów (w którym każdy z procesów może korzystać ze współbieżności opartej o włókna lub green threads), które komunikują się między sobą z użyciem jednego z wielu dostępnych obecnie mechanizmów wymiany komunikatów (kolejek). Można również sięgnąć po JRuby, który na dzień dzisiejszy na tyle okrzepł, że wydaje się poważną alternatywą dla MRI.
Ale o tym może innym razem.
update: kod jest dostępny na githubie

2 komentarze:
Bardzo ciekawy i konkretny wpis. Powinieneś pisać częściej :).
Dzięki Radarek - staram się, ale czasu brak ;)
Prześlij komentarz