Dart: Futures and Streams
Learn how to use Futures and Streams for writing asynchronous code in dart By Michael Katz.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
Dart: Futures and Streams
30 mins
Error Handling with Await
You previously used catchError
blocks to handle Future
errors. By using await
, you can instead use a regular try/catch
. For example, rewrite the previous error handling by replacing main
with the following:
Future<int> getCityCount() async {
throw NetworkError();
}
void main() async {
try {
final cityCount = await getCityCount();
print("Got a value: $cityCount");
} catch(e) {
print("Got an error: $e");
}
}
The getCityCount
function should theoretically return the number of cities, but it instead throws a NetworkError
. In main
, you can wrap the call to getCityCount
with a try/catch
to catch the error and avoid a crash.
In the above example, the first print never executes because getCityCount
eventually returns an exception. It’s caught in the catch block and printed to the console.
Just like using test
with the catchError
method on Future
, you can also stack catch
blocks to handle various errors in various ways. Change main
once again:
void main() async {
try {
final cityCount = await getCityCount();
print("Got a value: $cityCount");
} on LoginError {
print("Invalid username or password.");
} on NetworkError {
print("Network failed, try again.");
} catch(e) {
print("Got an error: $e");
}
}
With this stack of on
and catch
blocks, the console outputs the specific NetworkError
message instead of the generic one. You’ll also see this code is more straightforward to read and understand than the earlier example with error and test callbacks.
Using Streams
Streams are another aspect to asynchronous programming in Dart. A Future
represents a one-time value: the app performs an operation and comes back with some data. A Stream
represents a sequence of data. For example, you can use a Future
to get the entire contents of a file or use a Stream
to get the contents a line at a time. A Future
could represent the value of a web form after the user presses “Enter,” and a Stream
can encapsulate the changing data as the user types.
Replace main
with the following:
void main() async {
var stream = Stream<int>.periodic(const Duration(seconds: 1), (i) => i);
stream.forEach(print);
}
Here a periodic
Stream
is created that emits an increasing integer every second. Using forEach
, it prints the next value as it’s available.
If you run this, the console updates each second with a new number.
Creating Streams
You can create a Stream
by repeatedly sending data from some source such as a file or a network server or by transforming an existing Stream
.
Going back to the city example, add the following:
Stream<String> loadCityStream() async* {
for(final city in fetchCityList()) {
yield city;
}
}
This has a few new factors occurring:
- The return type is now a
Stream
, which means this returns aStream
providingString
values over time. - This has the async* annotation. It will not only return immediately like an
async
function does, but it will return a series of values and wrap them as aStream
. - The body iterates over the city list and uses
yield
to send each value to the stream as thefor
loop iterates.
To see how it’s used, replace main
once again:
void main() async {
await for (final city in loadCityStream()) {
print(city);
}
}
This gets the stream from loadCityStream
and uses await for
to iterate over each value as it’s yielded. In the integer example, you used forEach
. On a Stream
, that is equivalent to await for
. It’s the same relation as a list’s forEach
is to a regular for
operation.
When you run this code, the output will appear too quickly. Change loadCityStream
so you see each value appear one at a time:
Stream<String> loadCityStream() async* {
for(final city in fetchCityList()) {
await Future.delayed(Duration(milliseconds: 500));
yield city;
}
}
Note that because the function is already async*
, you don’t have to update the signature to accommodate the await
.
Re-running the program will print the cities one at a time at a noticeable pace.
A common method to generate a Stream
is by transforming an existing stream. Several list and iterator methods are also on Stream
. You can use them to change a Stream
by skipping, filtering and mapping values. You might do this by taking network bytes, deserializing them into objects and then extracting one field from that object. For example, if you had an address book you might load all the entries from a local database, collect the first and last names of each participant and then format them to get a single name Stream
.
Try out this example. First add a population helper method:
int calculatePopulationOf(String city) {
final populations = {'Tokyo': 37274000, 'Delhi': 32065760, 'Shanghai': 28516904, 'São Paulo': 22429800, 'Mexico City': 22085140, 'Cairo': 21750020, 'Beijing': 21333332, 'Mumbai': 20961472, 'Kolkāta': 15133888, 'Manila': 14406059, 'Guangzhou': 13964274, 'Moscow': 12640818, 'Jakarta': 11074811, 'Bangkok': 10899698, 'Seoul': 9975709, 'London': 9540576, 'New York': 8177025};
return populations[city] ?? -1;
}
That method returns population numbers for a city.
Now update main
:
void main() async {
print("CITY: POPULATION");
loadCityStream()
.map((city) => "$city: " + calculatePopulationOf(city).toString())
.forEach(print);
loadCityStream()
.map(calculatePopulationOf)
.reduce((value, element) => value + element)
.then((total) => print("Total known population: $total"));
}
This uses a city loading stream in two ways. The first uses map
to construct an output string with the name and population. The second use chains a map
to turn the city names into population numbers and a reduce
to sum them up. Finally, it’s capped with a then
, which takes the final value and prints it.
Subscription and Broadcast Streams
In the previous example, you used two calls to loadCityStream
, which looks odd. Try reusing the stream by modifying main
:
void main() async {
final stream = loadCityStream();
print("CITY: POPULATION");
stream
.map((city) => "$city: " + calculatePopulationOf(city).toString())
.forEach(print);
stream
.map(calculatePopulationOf)
.reduce((value, element) => value + element)
.then((total) => print("Total known population: $total"));
}
When you run this, you’ll see the following in the console and the total won’t be computed:
Uncaught Error: Bad state: Stream has already been listened to.
The error is because loadCityStream
is a single subscription stream. It can only be listened to once and has a finite start and end. In this example, the stream starts when the listener map
is attached and continues to the last city.
In contrast, a broadcast stream can have many listeners and those listeners receive events for as long as they are attached. Creating a broadcast stream can be simple. In main
, change the first line:
final stream = loadCityStream().asBroadcastStream();
This converts the subscription stream to a broadcast one. The list will print and the total calculated.