Ana içeriğe geç

ch17-04-streams

Akışlar

Bu bölümde şimdiye kadar esas olarak bireysel geleceklerle sınırlı kaldık. Tek büyük istisna, kullandığımız asenkron kanaldı. Bölümün başlarında “Mesaj Gönderimi” kısmında asenkron kanalımız için alıcıyı nasıl kullandığımızı hatırlayın. Asenkron recv metodu zaman içinde bir dizi öğe üretir. Bu, genel bir modelin bir örneğidir ve genellikle bir akış olarak adlandırılır.

bilgi

Bir akış, asenkron bir yineleme biçimine benzerdir.

Bir dizi öğe, 13. bölümde Iterator traitine baktığımızda daha önce gördüğümüz bir şeydir, ancak iteratörler ile asenkron kanal alıcısı arasında iki farklılık vardır. İlk farklılık zaman unsuru: iteratörler senkron olduğundan, kanal alıcısı asenkron'dur. İkinci farklılık ise API'dir. Iterator ile doğrudan çalışırken, senkron next metodunu çağırırız. Özellikle trpl::Receiver akışında, bunun yerine asenkron recv metodunu çağırdık, ancak bu API'ler diğer bakımlardan oldukça benzer.

ipucu

Bu benzerlik, bir akışın asenkron bir yineleme biçimine benzemesinden kaynaklanmaktadır.

Bu benzerlik bir tesadüf değil. Rust'taki iteratörler ile akışlar arasındaki benzerlik, aslında herhangi bir iteratörden bir akış oluşturabileceğimiz anlamına gelir. Bir iteratörle olduğu gibi, bir akışla da next metodunu çağırarak ve ardından çıktıyı bekleyerek çalışabiliriz; bu, 17-30 listeleme örneğinde gösterilmiştir.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-30/src/main.rs:stream}}

Sayılardan oluşan bir dizi ile başlıyoruz, bunu bir iteratöre dönüştürüyoruz ve ardından tüm değerleri iki katına çıkarmak için map çağırıyoruz. Daha sonra iteratörü trpl::stream_from_iter fonksiyonu kullanarak bir akışa dönüştürüyoruz. Sonra while let döngüsü ile akıştaki öğeleri geldikçe döngüleştiriyoruz.

Maalesef, kodu çalıştırmayı denediğimizde, derlenmiyor. Çıktıda görüldüğü gibi, next metodunun mevcut olmadığı bilgisi veriliyor.

error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~

Çıktının belirttiği gibi, derleyici hatasının nedeni, next metodunu kullanabilmemiz için doğru trait'in kapsamda olması gerektiği. Şimdiye kadar yaptığımız tartışmayı göz önünde bulundurursak, bunun Stream olması mantıklı görünebilir, ancak burada ihtiyacımız olan trait aslında StreamExt. Buradaki Ext kısmı “uzantı” için: bu, Rust topluluğunda bir trait'i başka bir trait ile genişletmek için yaygın bir modeldir.

not

StreamExt, Stream üzerine daha yüksek seviyeli bir API seti sağlar.

Neden StreamExt'ye ihtiyacımız var, Stream neyi yapıyor? Kısaca cevap, Rust ekosisteminde Stream trait'inin, aslında Iterator ve Future trait'lerini birleştiren düşük seviyeli bir arabirimi tanımlamasıdır. StreamExt trait'i, next metodu yanı sıra Iterator trait'i tarafından sağlanan diğer yardımcı metodlar gibi, Stream üzerinde daha yüksek seviyeli bir API seti sağlar. Stream ve StreamExt trait'lerine bölümün sonunda biraz daha ayrıntılı döneceğiz. Şu anda, bununla devam etmemizi sağlamak için yeterlidir.

Derleyici hatasını düzeltmek için, Listing 17-31'de görünene göre trpl::StreamExt için bir use ifadesi eklememiz gerekir.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-31/src/main.rs:all}}

Tüm bu parçaları bir araya getirdiğimizde, bu kod istendiği gibi çalışır! Dahası, artık StreamExt kapsamda olduğundan, iteratörlerle olduğu gibi tüm yardımcı metotlarını da kullanabiliriz. Örneğin, Listing 17-32'de, filter metodunu kullanarak üç ve beşin katları olan her şeyi filtreliyoruz.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-32/src/main.rs:all}}

Elbette, bu çok ilginç değil. Normal iteratörlerle ve hiç asenkron olmadan da bunu yapabilirdik. O yüzden akışlara özgü bazı diğer şeylere bakalım.


Akışları Birleştirme

Birçok kavram doğal olarak akışlar olarak temsil edilir: bir kuyruğun içinde mevcut hâle gelen öğeler, ya da bir bilgisayarın belleğine sığmayacak kadar fazla veriyle çalışmak için dosya sisteminden yalnızca chunk'lar çekerek, ya da zaman içinde ağ üzerinden gelen veriler. Akışlar gelecektir, bu nedenle diğer tür gelecekle de birlikte kullanılabiliriz ve ilginç şekillerde birleştirebiliriz. Örneğin, çok fazla ağ çağrısını tetiklemekten kaçınmak için olayları toplu hale getirebiliriz, uzun süreli işlemler dizilerine zaman aşımı ayarlayabiliriz veya kullanıcı arayüzü olaylarını kısıtlayarak gereksiz işleri önleyebiliriz.

Bir WebSocket veya başka bir gerçek zamanlı iletişim protokolünden görebileceğimiz bir veri akışını temsil eden küçük bir mesaj akışı oluşturarak başlayalım. Listing 17-33'te, String öğesi döndüren impl Stream döndüren get_messages adlı bir işlev oluşturuyoruz. Uygulamasında, bir asenkron kanal oluşturuyoruz, İngiliz alfabesinin ilk on harfini döngüye alıyoruz ve bunları kanaldan gönderiyoruz.

Ayrıca trpl::channel'dan rx alıcısını next metodu ile Stream'e dönüştüren yeni bir tür olan ReceiverStream'i de kullanıyoruz. main fonksiyonuna geri döndüğümüzde, akıştan gelen tüm mesajları yazdırmak için bir while let döngüsü kullanıyoruz.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-33/src/main.rs:all}}

Bu kodu çalıştırdığımızda, tam olarak beklediğimiz sonuçları alırız:

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
not

Bu sonuçları normal Receiver API'si ile de elde edebilirdik.

Bunu normal Receiver API'si ile veya hatta normal Iterator API'si ile de yapabilirdik. Bir akışın gerektirdiği bir şeyi ekleyelim: akıştaki her öğeye uygulanan bir zaman aşımı ve gönderdiğimiz öğelerde bir gecikme.

Listing 17-34'te, StreamExt trait'inden gelen timeout metodu ile akışa bir zaman aşımı eklemeye başlıyoruz. Sonra while let döngüsünün gövdesini güncelliyoruz, çünkü akış şimdi bir Result döndürür. Ok varyantı mesajın zamanında geldiğini; Err varyantı ise zaman aşımı gerçekleştiğinde herhangi bir mesajın gelmediğini gösterir. match ifadesi ile bu sonucu kontrol ederiz ve ya başarıyla aldığımızda mesajı yazdırır ya da zaman aşımı bildirimi yazdırırız. Son olarak, zaman aşımını uyguladıktan sonra mesajları sabitliyoruz, çünkü zaman aşımı yardımcı programı, üzerinde polledilmesi için sabitlenmesi gereken bir akış üretir.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-34/src/main.rs:timeout}}

Ancak, mesajlar arasında gecikmeler olmadığı için bu zaman aşımı programın davranışını değiştirmiyor. Gönderdiğimiz mesajlara daha değişken bir gecikme ekleyelim. get_messages işlevinde, gönderdiğimiz her öğeyle birlikte her öğenin dizininin indeksini alabilmemiz için messages dizisi ile enumerate iteratör metodunu kullanıyoruz. Ardından, çift indeksli öğelere 100 milisaniye gecikme ve tek indeksli öğelere 300 milisaniye gecikme uyguluyoruz; bu da gerçek dünyada bir akıştaki mesajların sahip olabileceği farklı gecikmeleri simüle etmek içindir. Zaman aşımımız 200 milisaniye olduğundan, bu durum mesajların yarısını etkilemelidir.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-35/src/main.rs:messages}}

Mesajlar arasında uyku çekmek için get_messages işlevinde engellemeksizin, asenkron kullanmamız gerekiyor. Ancak, get_messages işlevini kendisi asenkron bir işlev haline getiremeyiz, çünkü bu durumda bir Future> döndürmek yerine Stream döndürürüz. Çağıran, akışa erişim elde etmek için get_messages'i beklemelidir. Ama unutmayın: verilen bir gelecekte her şey doğrusal gerçekleşir; eşzamanlılık, gelecekler arasındaki işlemdir. get_messages'i beklemek, akış alıcı akışını döndürmeden önce tüm mesajları göndermesini gerektireceğinden, zaman aşımı sonunda işe yaramaz hale gelecektir. Akışın kendisinde gecikmeler olmayacak: gecikmeler tümüyle akış kullanılabilir olmadan önce gerçekleşecektir.

tehlike

Bunun yerine, get_messages'i bir akış döndüren normal bir işlev olarak bırakıyoruz.

Artık kodumuz çok daha ilginç bir sonuç veriyor! Her diğer mesaj çiftinin arasından bir hata bildirimi görüyoruz: Problem: Elapsed(()).

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

Zaman aşımı, mesajların sonunda gelmesini engellemiyor – hâlâ tüm orijinal mesajları alıyoruz. Bunun nedeni, kanalımızın sınırsız olmasıdır: belleğimizde tutabileceğimiz kadar mesajı barındırabilir. Mesaj zaman aşımına uğramadan önce gelmezse, akış işleyicimiz bununla başa çıkacaktır, ancak akışı tekrar kontrol ettiğinde, mesaj şimdi gelmiş olabilir.

Gerekirse farklı bir davranış elde edebilirsiniz; diğer kanal türlerini veya daha genel olarak diğer akış türlerini kullanarak. Bu bölüm için son örneğimizde, bir zaman aralığı akışı ile bu mesaj akışını birleştirerek birini pratikte görelim.


Akışları Birleştirme

Öncelikle, her milisaniyede bir öğe yayacak başka bir akış oluşturalım eğer onu doğrudan çalıştırırsak. Basitlik için, bir gecikme ile bir mesaj göndermek için sleep fonksiyonunu kullanabiliriz ve bunu get_messages içinde kullandığımız kanaldan bir akış oluşturma yaklaşımıyla birleştirebiliriz. Fark, bu sefer geçtiğimiz zaman aralıklarının sayısını geri göndereceğimizdir; bu nedenle dönüş türü impl Stream olacak ve işlevimizi get_intervals olarak adlandıracağız.

Listing 17-36'da, ilk olarak görev içindeki bir count tanımlıyoruz. (Onu görev dışında da tanımlayabiliriz, ancak her değişkenin kapsamını kısıtlama seçeneği daha belirgindir.) Sonra sonsuz bir döngü oluşturuyoruz. Döngünün her yinelemesi asenkron olarak bir milisaniye uyuyor, sayıyı artırıyor ve ardından bunu kanaldan gönderiyor. Bu Her şey, spawn_task tarafından oluşturulan görevle paketlenmiş olduğundan, tüm bunlar runtime ile birlikte temizlenir, sonsuz döngü de dahil.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-36/src/main.rs:intervals}}

Bu tür bir sonsuz döngü, tüm runtime kapatılana kadar devam eder ve asenkron Rust'ta oldukça yaygındır: birçok programın sürekli çalışması gerekir. Asenkron durumda, bu başka bir şeyi engellemez; her döngü yinelemesinde en az bir bekleme noktası olduğu sürece.

Ana fonksiyonumuzun asenkron bloğunda, get_intervals'i çağırarak başlıyoruz. Sonra, messages ve intervals akışlarını merge metodu ile birleştiriyoruz; bu, bir kaynaktan öğeler mevcut olduğunda, belli bir sıralamayı dayatmadan, birden fazla akışı tek bir akışa dönüştürür. Son olarak, birleştirilmiş akış üzerinde döngü oluşturuyoruz (Listing 17-37).

{{#rustdoc_include ../listings/ch17-async-await/listing-17-37/src/main.rs:main}}

Bu noktada, hem messages hem de intervals sabitlenmiş veya değiştirilebilir olmak zorunda değildir, çünkü ikisi de tek bir merged akışına birleştirilecektir. Ancak, merge çağrısı derlenmiyor! (Ayrıca while let döngüsündeki next çağrısı da derlenmiyor; bunu düzeltmeden sonra geri döneceğiz.) İki akışın türleri farklıdır. messages akışının türü, timeout çağrısı için Stream'i uygulayan Timeout> türüdür. Bu arada, intervals akışının türü impl Streamdir. Bu iki akışı birleştirmek için, bunlardan birinin türünü diğerinin türü ile eşleştirmemiz gerekiyor.

Listing 17-38'de, messages daha önce istediğimiz temel formatta olduğundan ve zaman aşımı hatalarını halletmek zorunda olduğundan, intervals akışını yeniden yapılandırıyoruz. İlk olarak, intervals'ı bir dizeye dönüştürmek için map yardımcı metodunu kullanabiliriz. İkinci olarak, messages'ten Timeout ile eşleşmemiz gerekiyor. Ancak aslında intervals için bir zaman aşımına ihtiyacımız olmadığından, diğer sürelerden daha uzun bir zaman aşımı oluşturabiliriz. Burada, Duration::from_secs(10) ile 10 saniyelik bir zaman aşımı oluşturuyoruz. Son olarak, stream'in değiştirilebilir olması gerektiğinden, while let döngüsünün next çağrılarının akışı yinelemeye alabilmesi için sabitlenmesi gerekmektedir.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-38/src/main.rs:main}}

Bu, neredeyse ihtiyaç duyduğumuz yere ulaştırıyor. Her şey tür denetiminden geçiyor. Ancak bunu çalıştırırsanız, iki sorun olacak. Birincisi, asla durmayacak! ctrl-c ile durdurmanız gerekecek. İkincisi, İngiliz alfabasından gelen mesajlar, tüm zaman aralıkları sayısı mesajlarının ortasında kaybolacak:

--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--

Listing 17-39, bu son iki sorunu çözmenin bir yolunu gösterir. İlk olarak, intervals akışında throttle metodunu kullanarak, onu messages akışını aşmayacak şekilde sınırlandırıyoruz. Nantalanma, bir işlevin ne sıklıkta çağrılacağını sınırlamanın bir yoludur—veya, bu durumda, akışın ne sıklıkta kontrol edileceğidir. Her yüz milisaniyede bir çağrı yapmaktir, çünkü bu, mesajlarımızın sıklığıyla hemen hemen aynı aralıktadır.

Bir akıştan kabul edeceğimiz öğe sayısını sınırlamak için take metodunu kullanabiliriz. Bunu birleştirilmiş akışa uygularız, çünkü nihai çıktıyı sınırlamak istiyoruz, sadece bir akış veya diğerine değil.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-39/src/main.rs:throttle}}

Artık programı çalıştırdığımızda, akıştan yirmi öğe çekildikten sonra duruyor ve zaman aralıkları mesajları aşmıyor. Ayrıca Interval: 100 veya Interval: 200 vb. almayız; bunun yerine Interval: 1, Interval: 2 vb. alırız—her ne kadar her milisaniyede bir olay üretebilen bir kaynak akışımız olmasına rağmen. Bunun nedeni, throttle çağrısının, orijinal akışı sarmalayarak yeni bir akış üretmesidir, böylece orijinal akış yalnızca throttle hızında kontrol edilecektir; kendi “doğal” hızında değil. İlgilenmediğimiz bir dizi cevapsız zaman aralığı mesajı oluşturmak istemiyoruz. Bunun yerine, daha başında bu zaman aralığı mesajlarını üretmiyoruz! Bu, Rust'ın geleceğinin özünde "tembellik" derecelerinin bir kez daha çalışır duruma gelmesidir; bizlere performans tatlarını seçme imkânı tanır.

ipucu

throttle çağrısının, orijinal akışı sarmalayarak yeni bir akış üretmesi, kaynak akışının belirli bir hızda kontrol edilmesini sağlar.


Son olarak, ele almamız gereken bir şey kalıyor: hatalar! Bu iki kanal tabanlı akışta, send çağrıları, kanalın diğer tarafı kapandığında başarısız olabilir - ve bu, akışı oluşturan geleceklere bağlıdır. Şimdiye kadar bunu unwrap çağırarak göz ardı ettik, ancak düzgün bir uygulamada, bu hatayı açıkça ele almalıyız; en azından döngüyü sonlandırarak, daha fazla mesaj göndermeye çalışmadığımızdan emin olmalıyız! Listing 17-40, basit bir hata stratejisini gösterir: sorunu yazdırır ve ardından döngülerden break ekler. Her zamanki gibi, bir mesaj gönderme hatasını ele almak için doğru yol değişecektir—sadece bir stratejiniz olduğundan emin olun.

{{#rustdoc_include ../listings/ch17-async-await/listing-17-40/src/main.rs:errors}}

Artık pratikte bir dizi asenkron işlem gördüğümüze göre, Future, Stream ve Rust'ın asenkron çalışmasını sağlamak için kullandığı diğer önemli özelliklerin bazı ayrıntılarına dalalım.