Dart: Futures and Streams
Learn how to use Futures and Streams for writing asynchronous code in dart By Michael Katz.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
Dart: Futures and Streams
30 mins
Listening to Streams
The Stream
transform functions like map
listen to the source stream and return a new Stream
. Any function that receives events from a is a listener.
With all the convenient methods, including then
, you'll rarely have to write your own listener. You'll have finer control over what happens when data loads and you'll perform actions on events — such as when the Stream
ends.
For example, add to main
:
stream
.listen(
(city) => print("loaded `$city`"),
onDone: () => print("all cities loaded")
);
The first callback occurs when data is ready, and onDone
is called when the stream is complete. In this case, an all-done message is printed when it's complete.
One other reason to use listen
is that it returns a StreamSubscription
object, which gives the option to pause or cancel the stream.
Replace what you just added with:
final subscription = stream.listen(
(city) => print("loaded `$city`"),
onDone: () => print("all cities loaded")
);
await Future.delayed(Duration(seconds: 2));
subscription.pause();
await Future.delayed(Duration(seconds: 2));
subscription.resume();
await Future.delayed(Duration(seconds: 2));
subscription.cancel();
The change stores the subscription object so it can be paused, resumed and canceled after a short delay. It's worth looking at the console output to understand what the result was for performing these operations:
CITY: POPULATION
Bangkok: 10899698
loaded `Bangkok`
Beijing: 21333332
loaded `Beijing`
Cairo: 21750020
loaded `Cairo`
Delhi: 32065760
Guangzhou: 13964274
Jakarta: 11074811
Kolkāta: 15133888
loaded `Delhi`
loaded `Guangzhou`
loaded `Jakarta`
loaded `Kolkāta`
Manila: 14406059
loaded `Manila`
Mexico City: 22085140
loaded `Mexico City`
Moscow: 12640818
loaded `Moscow`
Mumbai: 20961472
loaded `Mumbai`
New York: 8177025
São Paulo: 22429800
Seoul: 9975709
Shanghai: 28516904
Tokyo: 37274000
Total known population: 302688710
Because city names appear at regular intervals, you can inspect the output to learn what's happening with the Stream
and with each of the listeners.
An alternating printout shows the population and the "loaded" message of each city. After 2 seconds (when Cairo was loaded), the listener displaying the "loaded" message was paused. For a while, only the population messages displayed.
After another 2 seconds, the "loaded message" listener resumed. Because it's listening to a broadcast stream, all the missed events were buffered and sent at once, resulting in a few loaded messages being displayed before Manila comes along and both listeners are printing again.
Two seconds later, the subscription cancels and the loaded messages stop altogether.
In the case where you have a single subscriber stream, pausing the subscription stops generating events, and resuming will restart the event generator where it left off.
To try that out, replace main
with:
void main() async {
final stream = loadCityStream();
final subscription = stream.listen(
(city) => print("loaded `$city`"),
onDone: () => print("all cities loaded")
);
await Future.delayed(Duration(seconds: 2));
subscription.pause();
await Future.delayed(Duration(seconds: 2));
subscription.resume();
await Future.delayed(Duration(seconds: 2));
subscription.cancel();
}
The resulting console output will be:
loaded `Bangkok`
loaded `Beijing`
loaded `Cairo`
loaded `Delhi`
loaded `Guangzhou`
loaded `Jakarta`
loaded `Kolkāta`
No intermediate events occurred when the Stream
pauses. When it cancels after 4 seconds, it only loads as far as Kolkāta.
Stream
means onDone
will not get called. If cleanup is needed, it's critical that you do it in onDone
as well as the optional completion block of cancel
.
You don't have to use listeners with Stream
s. Instead, you can use await for
to iterate over a Stream
s data or even await
to wait until the Stream
is done.
For example, replacing main
with this simple use brings back the old Future
behavior of waiting for all the data to load before continuing.
void main() async {
final cities = await loadCityStream().toList();
printCities(cities);
}
Handling Stream Errors
Error handling with Stream is the same as with
Future
s. You can use catchError
or a try/catch
with await for
.
For example, replace main
with:
Stream<String> failingStream() async* {
throw LoginError();
}
void main() async {
failingStream()
.forEach(print)
.catchError((e) => print("Stream failed: $e"));
}
This has a new helper, failingStream
that throws an exception. This is caught and printed with the catchError
method.
You can instead use catch
with an await for
in this example. Replace main with:
void main() async {
try {
await for (final city in failingStream()) {
print(city);
}
} catch (e) {
print("Caught error: $e");
}
}
Adding Timeouts
Another condition you might want to handle is timeouts. A Stream
will wait for the next event as long as there is a listener subscribed. In the case where you might have an indefinitely long computation or waiting for a remote resource to respond, you might want to build in a timeout.
The timeout
method takes a duration. As long as an event happens before the duration expires, the Stream
continues. Otherwise, by default a TimeoutException
is thrown. Like everything else, the timeout behavior can be determined with a callback.
Replace main
again:
void main() async {
print("Loading Cities:");
loadCityStream()
.timeout(Duration(milliseconds: 400))
.forEach(print)
.catchError((e) => print("Stream timed out, try again."), test: (e) => e is TimeoutException);
}
This adds a 400 millisecond timeout to the operation. Because each city is yielded at 500 milliseconds, it will time out right away and be caught by the catchError
block and and an error will be printed.
Where to Go From Here?
That's it for a quick introduction to Futures
and Streams
in Dart.
From here you can continue to explore more interesting ways to manipulate data in useful ways. Many of the methods in the API are useful for building custom streams and event handling; you're likely to never use them and only build on the higher order transforms.
The power of streams comes in handy for routing data and user events through Flutter widgets, like StreamBuilder
. To learn more about asynchronous programming, there is a chapter in The Dart Apprentice that covers streams and more in deep detail.