如何在 Flutter 中暂停和恢复 StreamProvider

How to pause and resume StreamProvider in Flutter

我正在尝试使用 StreamProvider 进行外汇实时更新。 该演示将通过定期从外部 API 获取最新数据来自动更新汇率。 (在此示例中每 60 秒一次)

下面是实现图

API call (Future event) --> Put data in stream
      ^                             |
      |                             V              
Wait for 60 seconds <--  StreamProvider listens for 
                        new event and rebuild widget

问题

即使导航到主视图,流也会继续。

如果我们使用StreamBuilder,我们也许可以调用listen()方法,这 将 return StreamSubscription。然后,可以根据需要调用 cancel()pause()resume() 方法。 我想知道在使用StreamProvider时是否有类似pauseresume的方法?

预计

pause 离开仪表板视图,resume return 进入仪表板视图。

代码

型号


class Currency {
  String? base;
  String? quote;
  double? rate;

  // constructor, factory constructor, etc.
  // ...
}

控制器

class CurrencyService {
  Currency? _currency;

  Stream<Currency?> get currencyStream async* {
    yield* Stream.periodic(Duration(seconds: 60), (_) {
      return getCurrencyData();
    }).asyncMap((event) async => await event);
  }

  Future<Currency?> getCurrencyData() async {
    try {
      // Perform API call and 
      // update Currency object
      // ...
      
    } catch (e) {
      print('Error: $e');
    }
    return _currency;
  }
}

查看

void main() async {
  runApp(
    MultiProvider(
      providers: [
        // some providers,
        // another one,
        // ...
        StreamProvider<Currency?>(
          create: (_) => CurrencyService().currencyStream,
          initialData: await CurrencyService().getCurrencyData(),
        ),
      ],
      child: TestApp(),
    ),
  );
}

class TestApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Live Update Demo',
      initialRoute: '/',
      routes: routes,
    );
  }
}

主视图(第 1 页)

class MainView extends StatefulWidget {
  const MainView({Key? key}) : super(key: key);

  @override
  _MainViewState createState() => _MainViewState();
}

class _MainViewState extends State<MainView> {

  // ...

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: Column(
        children: <Widget>[
          // ...
          ElevatedButton(
            onPressed: () {
              Navigator.pushNamed(context, '/dashboard');
            },
            child: Text('Dashboard')),
        ],
      ),
    );
  }
}

仪表板视图(第 2 页)

class DashboardView extends StatelessWidget {
  const DashboardView({Key? key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return Scaffold(body: Consumer<Currency?>(
      builder: (context, currency, child) {
        return Center(
          child: Column(
            mainAxisAlignment: MainAxisAlignment.center,
            children: [
              Container(
                child: Text('${currency?.base ?? ''}${currency?.quote ?? ''}'),
              ),
              Container(
                child: Text('${currency?.rate ?? ''}'),
              ),
            ],
          ),
        );
      },
    ));
  }
}

谢谢。

似乎无法在 Stream.periodic 上暂停和恢复 StreamProvider。相反,按照@Abion47

的建议,仍然可以使用Timer.periodicStreamController来实现实现

我们可以通过控制何时开始和停止向流中添加新数据来模拟暂停和恢复。其中一种方法是在导航到仪表板视图时(按下按钮后)启动 Timer.periodic 并在返回主视图时启动 cancel 计时器(弹出仪表板视图)。

ElevatedButton(
  onPressed: () {
    // start timer
    // ...
    Navigator.pushNamed(...).then((_) { 
      // stop timer
      // this section is triggered when returning from dashboard to main view
    });
}

修改代码

// Controller

class CurrencyService {
  Currency? _currency;
  Timer? _pollingTimer;
  StreamController<Currency?> _currencyController = StreamController.broadcast();

  Future<void> addCurrencyData() async {
    await getCurrencyData()
        .then((currency) => _currencyController.add(currency));
  }

  void closeStream() {
    _currencyController.close();
  }

  void startPolling() {
    addCurrencyData();
    _pollingTimer = Timer.periodic(Duration(seconds: 60), (_) => addCurrencyData());
  }

  void stopPolling() {
    _pollingTimer?.cancel();
  }

  Stream<Currency?> get currencyStream => _currencyController.stream;

  Future<Currency?> getCurrencyData() async {
    try {
      // Perform API call and 
      // update Currency object
      // ...
      
    } catch (e) {
      print('Error: $e');
    }
    return _currency;
  }
}

// Main

void main() async {
  runApp(
    MultiProvider(
      providers: [
        // some providers,
        // another one,
        // ...
        Provider(create: (_) => CurrencyService()),
      ],
      child: TestApp(),
    ),
  );
}

class TestApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Live Update Demo',
      initialRoute: '/',
      routes: routes,
    );
  }
}

// Main view (page 1)

class MainView extends StatelessWidget {
  const MainView({Key? key}) : super(key: key);

  // ...

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: Column(
        children: <Widget>[
          // ...
          ElevatedButton(
            onPressed: () {
              Provider.of<CurrencyService>(context, listen: false)
                .startPolling();
              Navigator.pushNamed(
                context, 
                '/dashboard',
              ).then((_) => Provider.of<CurrencyService>(context, listen: false).stopPolling());
            },
            child: Text('Dashboard')),
        ],
      ),
    );
  }
}

// Dashboard view (page 2)

class DashboardView extends StatelessWidget {
  const DashboardView({Key? key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    final currencyService = Provider.of<CurrencyService>(context);
    return Scaffold(
      body: StreamProvider<Currency?>.value(
        initialData: null,
        value: currencyService.currencyStream,
        child: CurrencyRate(),
      ),
    );
  }
}


class CurrencyRate extends StatelessWidget {
  const CurrencyRate({Key? key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    final currency = context.watch<Currency?>();
    return Center(
      child: currency == null
        ? CircularProgressIndicator()
        : Column(
            mainAxisAlignment: MainAxisAlignment.center,
              children: [
                Container(
                  child: Text('${currency?.base ?? ''}${currency?.quote ?? ''}'),
                ),
                Container(
                  child: Text('${currency?.rate ?? ''}'),
                ),
              ],
            ),
         );   
      } 
}