Angular/RxJs Когда я должен отписаться от `Подписки`

Когда я должен хранить Subscription экземпляры и вызывать unsubscribe() во время жизненного цикла NgOnDestroy и когда я могу просто игнорировать их?

Сохранение всех подписок вносит много ошибок в код компонента.

HTTP Client Guide игнорирует такие подписки:

getHeroes() {
  this.heroService.getHeroes()
                   .subscribe(
                     heroes => this.heroes = heroes,
                     error =>  this.errorMessage = <any>error);
}

В то же время руководство по маршруту и ​​навигации говорит, что:

В конце концов мы перейдем куда-нибудь еще. Маршрутизатор удалит этот компонент из DOM и уничтожит его. Нам нужно привести себя в порядок до того, как это произойдет. В частности, мы должны отписаться, прежде чем Angular уничтожит компонент. Невыполнение этого требования может привести к утечке памяти.

Отписываемся от наших Observable в ngOnDestroy метод.

private sub: any;

ngOnInit() {
  this.sub = this.route.params.subscribe(params => {
     let id = +params['id']; // (+) converts string 'id' to a number
     this.service.getHero(id).then(hero => this.hero = hero);
   });
}

ngOnDestroy() {
  this.sub.unsubscribe();
}

29 ответов

Решение

--- Редактировать 4 - Дополнительные ресурсы (2018/09/01)

В недавнем эпизоде ​​" Приключения в Angular" Бен Леш и Уорд Белл обсуждают вопросы о том, как и когда отписаться в компоненте. Обсуждение начинается примерно в 1:05:30.

Уорд упоминает right now there's an awful takeUntil dance that takes a lot of machinery и Шай Резник упоминает Angular handles some of the subscriptions like http and routing,

В ответ Бен упоминает, что сейчас идут обсуждения, позволяющие Observables подключаться к событиям жизненного цикла компонента Angular, а Ward предлагает Observable событий жизненного цикла, на которые компонент может подписаться, чтобы узнать, когда завершать Observables, поддерживаемые как внутреннее состояние компонента.

Тем не менее, сейчас нам в основном нужны решения, так что вот некоторые другие ресурсы.

  1. Рекомендация для takeUntil() шаблон от основного члена команды RxJ Николаса Джеймисона и правило tslint, чтобы помочь обеспечить его соблюдение. https://blog.angularindepth.com/rxjs-avoiding-takeuntil-leaks-fb5182d047ef

  2. Облегченный пакет npm, который предоставляет оператор Observable, который принимает экземпляр компонента (this) в качестве параметра и автоматически отписывается во время ngOnDestroy, https://github.com/NetanelBasal/ngx-take-until-destroy

  3. Еще один вариант вышеупомянутого с немного лучшей эргономикой, если вы не делаете сборки AOT (но мы все должны делать AOT сейчас). https://github.com/smnbbrv/ngx-rx-collector

  4. Таможенная директива *ngSubscribe это работает как асинхронный канал, но создает встроенное представление в вашем шаблоне, чтобы вы могли ссылаться на значение "развернутый" во всем шаблоне. https://netbasal.com/diy-subscription-handling-directive-in-angular-c8f6e762697f

Я упоминаю в комментарии к блогу Николаса, что чрезмерное использование takeUntil() это может быть признаком того, что ваш компонент пытается сделать слишком много, и что следует рассмотреть возможность разделения существующих компонентов на компоненты Feature и Presentational. Вы можете тогда | async Наблюдаемый из компонента Feature в Input компонента Presentational, что означает, что подписки нигде не нужны. Подробнее об этом подходе читайте здесь

--- Редактировать 3 - "Официальное" решение (2017/04/09)

Я говорил с Уордом Беллом об этом вопросе в NGConf (я даже показал ему этот ответ, который, как он сказал, был правильным), но он сказал мне, что команда разработчиков документации для Angular нашла решение этого вопроса, которое не было опубликовано (хотя они работают над его утверждением).). Он также сказал мне, что я могу обновить свой SO-ответ следующей официальной рекомендацией.

Решение, которое мы все должны использовать в будущем, - это добавить private ngUnsubscribe = new Subject(); поле для всех компонентов, которые имеют .subscribe() звонки в Observable в пределах своего кода класса.

Затем мы позвоним this.ngUnsubscribe.next(); this.ngUnsubscribe.complete(); в нашем ngOnDestroy() методы.

Секретный соус (как уже отмечалось @metamaker) заключается в takeUntil(this.ngUnsubscribe) перед каждым из наших .subscribe() вызовы, которые гарантируют все подписки, будут очищены после уничтожения компонента.

Пример:

import { Component, OnDestroy, OnInit } from '@angular/core';
// RxJs 6.x+ import paths
import { filter, startWith, takeUntil } from 'rxjs/operators';
import { Subject } from 'rxjs';
import { BookService } from '../books.service';

@Component({
    selector: 'app-books',
    templateUrl: './books.component.html'
})
export class BooksComponent implements OnDestroy, OnInit {
    private ngUnsubscribe = new Subject();

    constructor(private booksService: BookService) { }

    ngOnInit() {
        this.booksService.getBooks()
            .pipe(
               startWith([]),
               filter(books => books.length > 0),
               takeUntil(this.ngUnsubscribe)
            )
            .subscribe(books => console.log(books));

        this.booksService.getArchivedBooks()
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe(archivedBooks => console.log(archivedBooks));
    }

    ngOnDestroy() {
        this.ngUnsubscribe.next();
        this.ngUnsubscribe.complete();
    }
}

Примечание: важно добавить takeUntil оператор как последний, чтобы предотвратить утечки с промежуточными наблюдаемыми в цепочке операторов.

--- Изменить 2 (2016/12/28)

Источник 5

В руководстве по Angular глава Routing теперь заявляет следующее: "Маршрутизатор управляет наблюдаемыми объектами, которые он предоставляет, и локализует подписки. Подписки очищаются при уничтожении компонента, защищая от утечек памяти, поэтому нам не нужно отписываться от параметры маршрута наблюдаемые." - Марк Райкок

Вот обсуждение вопросов Github для Angular Docs, касающихся Router Observables, где Уорд Белл упоминает, что прояснение всего этого находится в работе.

--- Редактировать 1

Источник 4

В этом видео от NgEurope Роб Вормальд также говорит, что вам не нужно отписываться от Router Observables. Он также упоминает http обслуживание и ActivatedRoute.params в этом видео с ноября 2016 года.

--- Оригинальный ответ

TLDR:

Для этого вопроса есть (2) виды Observables - конечное значение и бесконечное значение.

httpObservables производить конечные (1) значения и что-то вроде DOM event listenerObservables производить бесконечные значения.

Если вы звоните вручную subscribe (без использования асинхронного канала), затем unsubscribe из бесконечного Observables,

Не беспокойся о конечных, RxJs позабочусь о них.

Источник 1

Я разыскал ответ Роба Вормальда в Angular's Gitter здесь.

Он заявляет (я реорганизован для ясности и акцента мое)

если это однозначная последовательность (например, HTTP-запрос), ручная очистка не требуется (при условии, что вы подписываетесь в контроллере вручную)

я должен сказать "если это последовательность, которая завершается " (из которых последовательности с одним значением, а-ля http, являются одной)

если это бесконечная последовательность, вы должны отписаться, что асинхронный канал делает для вас

Также он упоминает в этом видео на YouTube на Observables, что they clean up after themselves... в контексте наблюдаемых, которые complete (например, Обещания, которые всегда выполняются, потому что они всегда производят 1 значение и заканчиваются - мы никогда не беспокоились о том, чтобы отписаться от Обещаний, чтобы убедиться, что они убираются xhr слушатели событий, верно?).

Источник 2

Также в путеводителе по Rangle Angular 2 написано

В большинстве случаев нам не нужно явно вызывать метод отказа от подписки, если только мы не хотим отменить досрочно или у нашего Observable более продолжительный срок службы, чем у нашей подписки. Поведение операторов Observable по умолчанию заключается в удалении подписки сразу после публикации сообщений.complete() или.error(). Имейте в виду, что RxJS был разработан для того, чтобы использовать его в режиме "забей и забудь" большую часть времени.

Когда делает фразу our Observable has a longer lifespan than our subscription применять?

Он применяется, когда подписка создается внутри компонента, который уничтожается до (или не "задолго") Observable завершается.

Я читаю это как значение, если мы подписываемся на http запрос или наблюдаемое, которое испускает 10 значений, и наш компонент уничтожается до этого http запрос возвращает или 10 значений были отправлены, мы все еще в порядке!

Когда запрос возвращается или, наконец, передается 10-е значение, Observable будет завершен, и все ресурсы будут очищены.

Источник 3

Если мы посмотрим на этот пример из того же руководства по Rangle, то увидим, что Subscription в route.params действительно требует unsubscribe() потому что мы не знаем, когда те params перестанет меняться (испуская новые значения).

Компонент может быть уничтожен путем перемещения в этом случае, в этом случае параметры маршрута, вероятно, все еще будут изменяться (они могут технически измениться до завершения приложения), а ресурсы, выделенные в подписке, будут по-прежнему выделяться, поскольку не было completion,

Вам не нужно иметь кучу подписок и отписаться вручную. Используйте RxJS.Subject и takeUntil, чтобы обрабатывать подписки как босс:

import {Subject} from "rxjs/Subject";

@Component(
    {
        moduleId: __moduleName,
        selector: 'my-view',
        templateUrl: '../views/view-route.view.html',
    }
)
export class ViewRouteComponent implements OnDestroy
{
    componentDestroyed$: Subject<boolean> = new Subject();

    constructor(protected titleService: TitleService)
    {
        this.titleService.emitter1$
            .takeUntil(this.componentDestroyed$)
            .subscribe(
            (data: any) =>
            {
                // ... do something 1
            }
        );

        this.titleService.emitter2$
            .takeUntil(this.componentDestroyed$)
            .subscribe(
            (data: any) =>
            {
                // ... do something 2
            }
        );

        // ...

        this.titleService.emitterN$
            .takeUntil(this.componentDestroyed$)
            .subscribe(
            (data: any) =>
            {
                // ... do something N
            }
        );
    }

    ngOnDestroy()
    {
        this.componentDestroyed$.next(true);
        this.componentDestroyed$.complete();
    }
}

Альтернативный подход, который был предложен @acumartini в комментариях, использует takeWhile вместо takeUntil. Вы можете предпочесть это, но имейте в виду, что таким образом выполнение Observable не будет отменено для ngDestroy вашего компонента (например, когда вы делаете трудоемкие вычисления или ждете данных с сервера). Метод, основанный на takeUntil, не имеет этого недостатка и приводит к немедленной отмене запроса. Спасибо @AlexChe за подробное объяснение в комментариях.

Итак, вот код:

@Component(
    {
        moduleId: __moduleName,
        selector: 'my-view',
        templateUrl: '../views/view-route.view.html',
    }
)
export class ViewRouteComponent implements OnDestroy
{
    alive: boolean = true;

    constructor(protected titleService: TitleService)
    {
        this.titleService.emitter1$
            .takeWhile(() => this.alive)
            .subscribe(
            (data: any) =>
            {
                // ... do something 1
            }
        );

        this.titleService.emitter2$
            .takeWhile(() => this.alive)
            .subscribe(
            (data: any) =>
            {
                // ... do something 2
            }
        );

        // ...

        this.titleService.emitterN$
            .takeWhile(() => this.alive)
            .subscribe(
            (data: any) =>
            {
                // ... do something N
            }
        );
    }

    // Probably, this.alive = false MAY not be required here, because
    // if this.alive === undefined, takeWhile will stop. I
    // will check it as soon, as I have time.
    ngOnDestroy()
    {
        this.alive = false;
    }
}

Класс подписки имеет интересную особенность:

Представляет одноразовый ресурс, такой как выполнение Observable. У подписки есть один важный метод - отписаться, который не принимает аргументов и просто удаляет ресурс, удерживаемый подпиской.
Кроме того, подписки могут быть сгруппированы с помощью метода add(), который прикрепит дочернюю подписку к текущей подписке. Когда подписка отменяется, все ее дочерние элементы (и ее внуки) также отписываются.

Вы можете создать совокупный объект подписки, который группирует все ваши подписки. Это можно сделать, создав пустую подписку и добавив к ней подписки, используя ее add() метод. Когда ваш компонент уничтожен, вам нужно только отменить подписку на совокупную подписку.

@Component({ ... })
export class SmartComponent implements OnInit, OnDestroy {
  private subscriptions = new Subscription();

  constructor(private heroService: HeroService) {
  }

  ngOnInit() {
    this.subscriptions.add(this.heroService.getHeroes().subscribe(heroes => this.heroes = heroes));
    this.subscriptions.add(/* another subscription */);
    this.subscriptions.add(/* and another subscription */);
    this.subscriptions.add(/* and so on */);
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}

Некоторые из лучших практик в отношении аннулируемых подписок внутри компонентов Angular:

Цитата из Routing & Navigation

При подписке на наблюдаемое в компоненте вы почти всегда соглашаетесь отменить подписку, когда компонент уничтожен.

Есть несколько исключительных наблюдаемых, где это не обязательно. Наблюдаемые значения ActivatedRoute входят в число исключений.

ActivatedRoute и его наблюдаемые изолированы от самого маршрутизатора. Маршрутизатор уничтожает маршрутизируемый компонент, когда он больше не нужен, и внедренный ActivatedRoute умирает вместе с ним.

Не стесняйтесь отписаться в любом случае. Это безвредно и никогда не бывает плохой практикой.

И в ответ на следующие ссылки:

Я собрал некоторые из лучших практик в отношении аннулируемых подписок внутри компонентов Angular, чтобы поделиться ими с вами:

  • http наблюдаемая отмена подписки является условной, и мы должны учитывать последствия "обратного вызова подписки", запускаемого после уничтожения компонента в каждом конкретном случае. Мы знаем, что угловой отписывается и очищает http наблюдаемая сама (1), (2). Хотя это верно с точки зрения ресурсов, это только половина истории. Допустим, мы говорим о прямом звонке http из компонента, и http ответ занял больше времени, чем нужно, поэтому пользователь закрыл компонент. subscribe() Обработчик будет по-прежнему вызываться, даже если компонент закрыт и уничтожен. Это может иметь нежелательные побочные эффекты, а в худших сценариях состояние приложения нарушается. Это также может вызвать исключения, если код в обратном вызове пытается вызвать что-то, что только что было удалено. Однако в то же время иногда они желательны. Например, предположим, что вы создаете почтовый клиент и запускаете звуковой сигнал, когда электронная почта завершает отправку - хорошо, что вы все равно хотите, чтобы это произошло, даже если компонент закрыт ( 8).
  • Не нужно отписываться от наблюдаемых или завершенных ошибок. Тем не менее, в этом нет никакого вреда (7).
  • использование AsyncPipe настолько, насколько это возможно, потому что он автоматически отписывается от наблюдаемого при уничтожении компонента.
  • Отписаться от ActivatedRoute наблюдаемые как route.params если они подписаны внутри вложенного (добавлено внутри tpl с помощью селектора компонента) или динамического компонента, так как они могут быть подписаны много раз, пока существует родительский / хост-компонент. Нет необходимости отписываться от них в других сценариях, как указано в приведенной выше цитате из Routing & Navigation Docs.
  • Отмените подписку на глобальные наблюдаемые, общие для компонентов, которые предоставляются через службу Angular, например, так как они могут быть подписаны несколько раз в течение инициализации компонента.
  • Нет необходимости отписываться от внутренних наблюдаемых службы в области приложения, так как эта служба никогда не будет уничтожена, если только все ваше приложение не будет уничтожено, нет реальной причины отказаться от нее и нет шансов утечки памяти. (6).

    Примечание. Что касается сервисов с определенной областью, то есть поставщиков компонентов, они уничтожаются при уничтожении компонента. В этом случае, если мы подписываемся на какой-либо наблюдаемый внутри этого провайдера, мы должны рассмотреть возможность отказа от него, используя OnDestroy хук жизненного цикла, который будет вызываться при уничтожении службы, в соответствии с документами.
  • Используйте абстрактную технику, чтобы избежать путаницы в коде, которая может возникнуть в результате отписки. Вы можете управлять своими подписками с takeUntil (3) или вы можете использовать это npm Пакет упоминается в (4) Самый простой способ отписаться от Observables в Angular.
  • Всегда отписываться от FormGroup наблюдаемые как form.valueChanges а также form.statusChanges
  • Всегда отписываться от наблюдаемых Renderer2 сервис как renderer2.listen
  • Отмените подписку от всех других наблюдаемых в качестве шага защиты от утечки памяти, пока Angular Docs явно не сообщит нам, какие наблюдаемые необязательны для отмены подписки (Проверьте проблему: (5) Документация для RxJS Unsubscribeing (Open)).
  • Бонус: всегда используйте угловые способы, чтобы связать такие события, как HostListener так как angular заботится об удалении слушателей событий, если это необходимо, и предотвращает любую потенциальную утечку памяти из-за привязок событий.

Хороший заключительный совет: если вы не знаете, автоматически ли отписывается / завершается заметка или нет, добавьте complete обратный звонок в subscribe(...) и проверьте, вызывается ли он, когда компонент уничтожен.

Это зависит. Если по телефону someObservable.subscribe(), вы начинаете задерживать некоторый ресурс, который должен быть освобожден вручную, когда жизненный цикл вашего компонента закончен, тогда вы должны вызвать theSubscription.unsubscribe() предотвратить утечку памяти.

Давайте внимательнее посмотрим на ваши примеры:

getHero() возвращает результат http.get(), Если вы посмотрите на угловой исходный код2, http.get() создает двух слушателей событий:

_xhr.addEventListener('load', onLoad);
_xhr.addEventListener('error', onError);

и позвонив unsubscribe()Вы можете отменить запрос, а также слушателей:

_xhr.removeEventListener('load', onLoad);
_xhr.removeEventListener('error', onError);
_xhr.abort();

Обратите внимание, что _xhr зависит от платформы, но я думаю, можно с уверенностью предположить, что это XMLHttpRequest() в твоем случае.

Обычно этого достаточно, чтобы получить руководство unsubscribe() вызов. Но согласно этой спецификации WHATWG, XMLHttpRequest() подлежит сборке мусора, как только он "сделан", даже если к нему подключены прослушиватели событий. Так что, я думаю, поэтому в Angular 2 официальное руководство отсутствует unsubscribe() и позволяет GC очистить слушателей.

Что касается вашего второго примера, это зависит от реализации params, На сегодняшний день в угловом официальном руководстве больше не отображается отписка от params, Я снова заглянул в SRC и обнаружил, что params это просто Поведение субъекта. Поскольку ни прослушиватели событий, ни таймеры не использовались, а глобальные переменные не создавались, можно смело опускать unsubscribe(),

Суть в том, что ваш вопрос всегда unsubscribe() в качестве защиты от утечки памяти, если только вы не уверены, что выполнение наблюдаемой не создает глобальные переменные, не добавляет прослушиватели событий, не устанавливает таймеры и не делает ничего другого, что приводит к утечкам памяти.

Если есть сомнения, посмотрите на реализацию этого наблюдаемого. Если наблюдаемое записало некоторую логику очистки в свой unsubscribe(), которая обычно является функцией, возвращаемой конструктором, тогда у вас есть веская причина серьезно подумать о вызове unsubscribe(),

Официальная документация Angular 2 дает объяснение, когда отписаться и когда ее можно безопасно игнорировать. Посмотрите на эту ссылку:

https://angular.io/docs/ts/latest/cookbook/component-communication.html

Найдите абзац с заголовком " Родитель и дети общаются через службу", а затем синее поле:

Обратите внимание, что мы фиксируем подписку и отменяем подписку, когда AstronautComponent уничтожен. Это шаг защиты от утечки памяти. В этом приложении нет никакого реального риска, потому что время жизни AstronautComponent совпадает со временем жизни самого приложения. Это не всегда будет верно в более сложном приложении.

Мы не добавляем эту защиту в MissionControlComponent, потому что, как родитель, он контролирует время жизни MissionService.

Я надеюсь, это поможет вам.

В Angular 16 была введена новая функция, упрощающая уничтожение наблюдаемых объектов.takeUntilDestroyed.

      data$ = http.get('...').pipe(takeUntilDestroyed()).subscribe(...);

По умолчанию его следует вызывать внутри конструктора. Чтобы использовать его в другом месте, затемDestroyRefнеобходимо.

      destroyRef = inject(DestroyRef);

ngOnInit(){
   data$ = http.get('...').subscribe(...)

   this.destoryRef.onDestroy(() => {
      data$.unsubscribe()
   })
}

Основано на: Использование наследования классов для подключения к жизненному циклу компонента Angular 2

Еще один общий подход:

export abstract class UnsubscribeOnDestroy implements OnDestroy {
  protected d$: Subject<any>;

  constructor() {
    this.d$ = new Subject<void>();

    const f = this.ngOnDestroy;
    this.ngOnDestroy = () => {
      f();
      this.d$.next();
      this.d$.complete();
    };
  }

  public ngOnDestroy() {
    // no-op
  }

}

И использовать:

@Component({
    selector: 'my-comp',
    template: ``
})
export class RsvpFormSaveComponent extends UnsubscribeOnDestroy implements OnInit {

    constructor() {
        super();
    }

    ngOnInit(): void {
      Observable.of('bla')
      .takeUntil(this.d$)
      .subscribe(val => console.log(val));
    }
}

Для наблюдаемых, которые завершаются сразу после выдачи результата, например AsyncSubject или, например, наблюдаемые из HTTP-запросов, и вам не нужно отказываться от подписки. Не помешает позвонитьunsubscribe() для тех, но если наблюдаемое closedметод отказа от подписки просто ничего не сделает:

if (this.closed) {
  return;
}

Если у вас есть долгоживущие наблюдаемые, которые со временем испускают несколько значений (например, BehaviorSubject или ReplaySubject) вам нужно отказаться от подписки, чтобы предотвратить утечку памяти.

Вы можете легко создать наблюдаемое, которое завершается сразу после выдачи результата из таких долгоживущих наблюдаемых с помощью оператора канала. В некоторых ответах здесьtake(1)труба упоминается. Но я предпочитаю first()труба. Разница вtake(1) в том, что это будет:

доставить EmptyError к обратному вызову ошибки Observer, если Observable завершается до отправки любого следующего уведомления.

Еще одним преимуществом первого канала является то, что вы можете передать предикат, который поможет вам вернуть первое значение, удовлетворяющее определенным критериям:

const predicate = (result: any) => { 
  // check value and return true if it is the result that satisfies your needs
  return true;
}
observable.pipe(first(predicate)).subscribe(observer);

Первый будет завершен сразу после выдачи первого значения (или при передаче аргумента функции первого значения, удовлетворяющего вашему предикату), поэтому нет необходимости отказываться от подписки.

Иногда вы не уверены, есть ли у вас долгоживущее наблюдаемое или нет. Я не говорю, что это хорошая практика, но вы всегда можете добавитьfirstpipe, чтобы убедиться, что вам не нужно отказываться от подписки вручную. Добавление дополнительногоfirst pipe на наблюдаемом, который будет выдавать только одно значение, не повредит.

В процессе разработки вы можете использовать singlepipe, который завершится ошибкой, если наблюдаемый источник генерирует несколько событий. Это может помочь вам изучить тип наблюдаемого и нужно ли отказываться от подписки на него.

observable.pipe(single()).subscribe(observer);

В first а также singleкажутся очень похожими, оба канала могут принимать необязательный предикат, но различия важны и хорошо резюмированы в этом ответе stackru здесь:

Первый

Выдаст, как только появится первый элемент. Завершится сразу после этого.

Один

Сбой, если наблюдаемый источник испускает несколько событий.


Обратите внимание, что я старался быть максимально точным и полным в своем ответе со ссылками на официальную документацию, но, пожалуйста, прокомментируйте, если что-то важное отсутствует...

Подписка по сути просто имеет функцию unsubscribe() для освобождения ресурсов или отмены выполнения Observable. В Angular мы должны отказаться от подписки на Observable, когда компонент уничтожается. К счастью, в Angular есть хук ngOnDestroy, который вызывается перед уничтожением компонента, это позволяет разработчикам предоставить здесь команду очистки, чтобы избежать зависания подписок, открытых порталов и прочего, что может появиться в будущем, чтобы укусить нас в спину

@Component({...})
export class AppComponent implements OnInit, OnDestroy {
    subscription: Subscription 
    ngOnInit () {
        var observable = Rx.Observable.interval(1000);
        this.subscription = observable.subscribe(x => console.log(x));
    }
    ngOnDestroy() {
        this.subscription.unsubscribe()
    }
}

Мы добавили ngOnDestroy в наш AppCompoennt и вызвали метод отмены подписки для объекта this.subscription Observable

Если есть несколько подписок:

@Component({...})
export class AppComponent implements OnInit, OnDestroy {
    subscription1$: Subscription
    subscription2$: Subscription 
    ngOnInit () {
        var observable1$ = Rx.Observable.interval(1000);
        var observable2$ = Rx.Observable.interval(400);
        this.subscription1$ = observable.subscribe(x => console.log("From interval 1000" x));
        this.subscription2$ = observable.subscribe(x => console.log("From interval 400" x));
    }
    ngOnDestroy() {
        this.subscription1$.unsubscribe()
        this.subscription2$.unsubscribe()
    }
}

Официальный ответ Edit #3 (и варианты) работает хорошо, но меня привлекает "мутность" бизнес-логики вокруг наблюдаемой подписки.

Вот еще один подход с использованием оберток.

Warining: экспериментальный код

Файл subscribeAndGuard.ts используется для создания нового расширения Observable для переноса .subscribe() и внутри него обернуть ngOnDestroy(),
Использование такое же, как .subscribe(), за исключением дополнительного первого параметра, ссылающегося на компонент.

import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';

const subscribeAndGuard = function(component, fnData, fnError = null, fnComplete = null) {

  // Define the subscription
  const sub: Subscription = this.subscribe(fnData, fnError, fnComplete);

  // Wrap component's onDestroy
  if (!component.ngOnDestroy) {
    throw new Error('To use subscribeAndGuard, the component must implement ngOnDestroy');
  }
  const saved_OnDestroy = component.ngOnDestroy;
  component.ngOnDestroy = () => {
    console.log('subscribeAndGuard.onDestroy');
    sub.unsubscribe();
    // Note: need to put original back in place
    // otherwise 'this' is undefined in component.ngOnDestroy
    component.ngOnDestroy = saved_OnDestroy;
    component.ngOnDestroy();

  };

  return sub;
};

// Create an Observable extension
Observable.prototype.subscribeAndGuard = subscribeAndGuard;

// Ref: https://www.typescriptlang.org/docs/handbook/declaration-merging.html
declare module 'rxjs/Observable' {
  interface Observable<T> {
    subscribeAndGuard: typeof subscribeAndGuard;
  }
}

Вот компонент с двумя подписками, одна с оболочкой и одна без. Единственное предостережение в том, что он должен реализовывать OnDestroy (с пустым телом, если необходимо), в противном случае Angular не знает, чтобы вызывать упакованную версию.

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import 'rxjs/Rx';
import './subscribeAndGuard';

@Component({
  selector: 'app-subscribing',
  template: '<h3>Subscribing component is active</h3>',
})
export class SubscribingComponent implements OnInit, OnDestroy {

  ngOnInit() {

    // This subscription will be terminated after onDestroy
    Observable.interval(1000)
      .subscribeAndGuard(this,
        (data) => { console.log('Guarded:', data); },
        (error) => { },
        (/*completed*/) => { }
      );

    // This subscription will continue after onDestroy
    Observable.interval(1000)
      .subscribe(
        (data) => { console.log('Unguarded:', data); },
        (error) => { },
        (/*completed*/) => { }
      );
  }

  ngOnDestroy() {
    console.log('SubscribingComponent.OnDestroy');
  }
}

Демоверсия здесь

Дополнительное примечание: Re Edit 3 - "Официальное" решение, это можно упростить, используя takeWhile() вместо takeUntil() перед подпиской, и просто логическое значение, а не другое Observable в ngOnDestroy.

@Component({...})
export class SubscribingComponent implements OnInit, OnDestroy {

  iAmAlive = true;
  ngOnInit() {

    Observable.interval(1000)
      .takeWhile(() => { return this.iAmAlive; })
      .subscribe((data) => { console.log(data); });
  }

  ngOnDestroy() {
    this.iAmAlive = false;
  }
}

Всегда рекомендуется отказаться от подписки на наблюдаемые подписки по соображениям производительности, чтобы избежать утечек памяти, и есть разные способы сделать это,

Между прочим, я прочитал большинство ответов и не нашел никого, кто говорил бы о async труба, рекомендуется Rxjs шаблон с приложениями Angular, потому что он обеспечивает автоматическую подписку и подписку при выходе из компонента, который будет уничтожен:

Пожалуйста, найдите пример того, как это можно реализовать

app.compoennt.ts :

      import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

import { BookService } from './book.service';
import { Book } from './book';

@Component({
   selector: 'app-observable',
   templateUrl: './observable.component.html'
})
export class AppComponent implements OnInit { 
   books$: Observable<Book[]>
   constructor(private bookService: BookService) { }
   ngOnInit(): void {
        this.books$ = this.bookService.getBooksWithObservable();
   }
} 

app.compoennt.html :

      <h3>AsyncPipe with Promise using NgFor</h3>
<ul>
  <li *ngFor="let book of books$ | async" >
    Id: {{book?.id}}, Name: {{book?.name}}
  </li>
</ul>

Поскольку решение seangwright (Edit 3) кажется очень полезным, я также счел затруднительным упаковывать эту функцию в базовый компонент, и подсказал другим партнерам по команде проекта не забывать вызывать super() для ngOnDestroy для активации этой функции.

Этот ответ предоставляет способ освободиться от супер-вызова и сделать "componentDestroyed$" ядром базового компонента.

class BaseClass {
    protected componentDestroyed$: Subject<void> = new Subject<void>();
    constructor() {

        /// wrap the ngOnDestroy to be an Observable. and set free from calling super() on ngOnDestroy.
        let _$ = this.ngOnDestroy;
        this.ngOnDestroy = () => {
            this.componentDestroyed$.next();
            this.componentDestroyed$.complete();
            _$();
        }
    }

    /// placeholder of ngOnDestroy. no need to do super() call of extended class.
    ngOnDestroy() {}
}

И тогда вы можете использовать эту функцию свободно, например:

@Component({
    selector: 'my-thing',
    templateUrl: './my-thing.component.html'
})
export class MyThingComponent extends BaseClass implements OnInit, OnDestroy {
    constructor(
        private myThingService: MyThingService,
    ) { super(); }

    ngOnInit() {
        this.myThingService.getThings()
            .takeUntil(this.componentDestroyed$)
            .subscribe(things => console.log(things));
    }

    /// optional. not a requirement to implement OnDestroy
    ngOnDestroy() {
        console.log('everything works as intended with or without super call');
    }

}

Обычно вам нужно отписаться, когда компоненты будут уничтожены, но Angular будет обрабатывать их все больше и больше, например, в новой минорной версии Angular4 у них есть этот раздел для отмены подписки:

Вам нужно отписаться?

Как описано в разделе "ActivatedRoute: единое окно для получения информации о маршруте" на странице "Маршрутизация и навигация", маршрутизатор управляет наблюдаемыми объектами и локализует подписки. Подписки очищаются при уничтожении компонента, защищая от утечек памяти, поэтому вам не нужно отписываться от маршрута paramMap Observable.

Также приведенный ниже пример является хорошим примером из Angular для создания компонента и его уничтожения после. Посмотрите, как компонент реализует OnDestroy. Если вам нужен onInit, вы также можете реализовать его в своем компоненте, как, например, реализует OnInit, OnDestroy

import { Component, Input, OnDestroy } from '@angular/core';  
import { MissionService } from './mission.service';
import { Subscription }   from 'rxjs/Subscription';

@Component({
  selector: 'my-astronaut',
  template: `
    <p>
      {{astronaut}}: <strong>{{mission}}</strong>
      <button
        (click)="confirm()"
        [disabled]="!announced || confirmed">
        Confirm
      </button>
    </p>
  `
})

export class AstronautComponent implements OnDestroy {
  @Input() astronaut: string;
  mission = '<no mission announced>';
  confirmed = false;
  announced = false;
  subscription: Subscription;

  constructor(private missionService: MissionService) {
    this.subscription = missionService.missionAnnounced$.subscribe(
      mission => {
        this.mission = mission;
        this.announced = true;
        this.confirmed = false;
    });
  }

  confirm() {
    this.confirmed = true;
    this.missionService.confirmMission(this.astronaut);
  }

  ngOnDestroy() {
    // prevent memory leak when component destroyed
    this.subscription.unsubscribe();
  }
}

В случае необходимости отписки можно использовать следующий оператор для метода наблюдаемой трубы

import { Observable, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { OnDestroy } from '@angular/core';

export const takeUntilDestroyed = (componentInstance: OnDestroy) => <T>(observable: Observable<T>) => {
  const subjectPropertyName = '__takeUntilDestroySubject__';
  const originalOnDestroy = componentInstance.ngOnDestroy;
  const componentSubject = componentInstance[subjectPropertyName] as Subject<any> || new Subject();

  componentInstance.ngOnDestroy = (...args) => {
    originalOnDestroy.apply(componentInstance, args);
    componentSubject.next(true);
    componentSubject.complete();
  };

  return observable.pipe(takeUntil<T>(componentSubject));
};

это можно использовать так:

import { Component, OnDestroy, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Component({ template: '<div></div>' })
export class SomeComponent implements OnInit, OnDestroy {

  ngOnInit(): void {
    const observable = Observable.create(observer => {
      observer.next('Hello');
    });

    observable
      .pipe(takeUntilDestroyed(this))
      .subscribe(val => console.log(val));
  }

  ngOnDestroy(): void {
  }
}

Оператор переносит метод компонента ngOnDestroy.

Важно: оператор должен быть последним в наблюдаемой трубе.

Я попробовал решение Seangwright (Edit 3)

Это не работает для Observable, созданного таймером или интервалом.

Тем не менее, я получил это с помощью другого подхода:

import { Component, OnDestroy, OnInit } from '@angular/core';
import 'rxjs/add/operator/takeUntil';
import { Subject } from 'rxjs/Subject';
import { Subscription } from 'rxjs/Subscription';
import 'rxjs/Rx';

import { MyThingService } from '../my-thing.service';

@Component({
   selector: 'my-thing',
   templateUrl: './my-thing.component.html'
})
export class MyThingComponent implements OnDestroy, OnInit {
   private subscriptions: Array<Subscription> = [];

  constructor(
     private myThingService: MyThingService,
   ) { }

  ngOnInit() {
    const newSubs = this.myThingService.getThings()
        .subscribe(things => console.log(things));
    this.subscriptions.push(newSubs);
  }

  ngOnDestroy() {
    for (const subs of this.subscriptions) {
      subs.unsubscribe();
   }
 }
}

Для обработки подписки я использую класс "Unsubscriber".

Вот класс Unsubscriber.

export class Unsubscriber implements OnDestroy {
  private subscriptions: Subscription[] = [];

  addSubscription(subscription: Subscription | Subscription[]) {
    if (Array.isArray(subscription)) {
      this.subscriptions.push(...subscription);
    } else {
      this.subscriptions.push(subscription);
    }
  }

  unsubscribe() {
    this.subscriptions
      .filter(subscription => subscription)
      .forEach(subscription => {
        subscription.unsubscribe();
      });
  }

  ngOnDestroy() {
    this.unsubscribe();
  }
}

И Вы можете использовать этот класс в любом компоненте / Сервис / Эффект и т. Д.

Пример:

class SampleComponent extends Unsubscriber {
    constructor () {
        super();
    }

    this.addSubscription(subscription);
}

После ответа @seangwright я написал абстрактный класс, который обрабатывает "бесконечные" подписки наблюдаемых в компонентах:

import { OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs/Subscription';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { PartialObserver } from 'rxjs/Observer';

export abstract class InfiniteSubscriberComponent implements OnDestroy {
  private onDestroySource: Subject<any> = new Subject();

  constructor() {}

  subscribe(observable: Observable<any>): Subscription;

  subscribe(
    observable: Observable<any>,
    observer: PartialObserver<any>
  ): Subscription;

  subscribe(
    observable: Observable<any>,
    next?: (value: any) => void,
    error?: (error: any) => void,
    complete?: () => void
  ): Subscription;

  subscribe(observable: Observable<any>, ...subscribeArgs): Subscription {
    return observable
      .takeUntil(this.onDestroySource)
      .subscribe(...subscribeArgs);
  }

  ngOnDestroy() {
    this.onDestroySource.next();
    this.onDestroySource.complete();
  }
}

Чтобы использовать его, просто расширьте его в своем угловом компоненте и вызовите subscribe() метод следующим образом:

this.subscribe(someObservable, data => doSomething());

Он также принимает ошибку и завершает обратные вызовы, как обычно, объект-наблюдатель или вообще не обратные вызовы. Не забудьте позвонить super.ngOnDestroy() если вы также реализуете этот метод в дочернем компоненте.

Найдите здесь дополнительную ссылку Бена Леша: RxJS: Не отписывайтесь.

Мне нравятся последние два ответа, но у меня возникла проблема, если на подкласс ссылались "this" в ngOnDestroy,

Я изменил это, чтобы быть этим, и похоже, что это решило эту проблему.

export abstract class BaseComponent implements OnDestroy {
    protected componentDestroyed$: Subject<boolean>;
    constructor() {
        this.componentDestroyed$ = new Subject<boolean>();
        let f = this.ngOnDestroy;
        this.ngOnDestroy = function()  {
            // without this I was getting an error if the subclass had
            // this.blah() in ngOnDestroy
            f.bind(this)();
            this.componentDestroyed$.next(true);
            this.componentDestroyed$.complete();
        };
    }
    /// placeholder of ngOnDestroy. no need to do super() call of extended class.
    ngOnDestroy() {}
}

Пакет SubSink, простое и последовательное решение для отказа от подписки

Поскольку никто об этом не упомянул, я хочу порекомендовать пакет Subsink, созданный Уордом Беллом: https://github.com/wardbell/subsink.

Я использовал его в проекте, где мы, несколько разработчиков, использовали его. Очень помогает последовательный подход, который работает в любой ситуации.

В приложении SPA в функции ngOnDestroy (angular lifeCycle) Для каждой подписки необходимо отменить ее. преимущество => чтобы государство не стало слишком тяжелым.

например: в компоненте 1:

import {UserService} from './user.service';

private user = {name: 'test', id: 1}

constructor(public userService: UserService) {
    this.userService.onUserChange.next(this.user);
}

в сервисе:

import {BehaviorSubject} from 'rxjs/BehaviorSubject';

public onUserChange: BehaviorSubject<any> = new BehaviorSubject({});

в компоненте 2:

import {Subscription} from 'rxjs/Subscription';
import {UserService} from './user.service';

private onUserChange: Subscription;

constructor(public userService: UserService) {
    this.onUserChange = this.userService.onUserChange.subscribe(user => {
        console.log(user);
    });
}

public ngOnDestroy(): void {
    // note: Here you have to be sure to unsubscribe to the subscribe item!
    this.onUserChange.unsubscribe();
}

Еще одно короткое дополнение к вышеупомянутым ситуациям:

  • Всегда отменяйте подписку, если новые значения в подписанном потоке больше не требуются или не имеют значения, это приведет к уменьшению количества триггеров и увеличению производительности в нескольких случаях. Такие примеры, как компоненты, в которых подписанные данные / событие больше не существуют или требуется новая подписка на весь новый поток (обновление и т. Д.), Являются хорошим примером для отказа от подписки.

Вот мой подход к этой проблеме, упрощая свою жизнь. Я выбрал ручной способ отмены подписки при уничтожении компонента.

Для этого я создал класс с именем Subscriptor, который в основном содержит статические члены, а именно: - Подписки на частные переменные - которые содержат все предоставленные подписки - Установщик подписки - который отправляет каждую новую подписку в массив подписок - Метод отмены подписки - который отменяет подписку каждую подписки, которые содержит массив подписок, если он определен, и очистить массив подписок

subscriptor.ts

      import { Subscription } from "rxjs";

export class Subscriptor {
    private static subscriptions: Subscription[] = [];

    static set subscription(subscription: Subscription) {
        Subscriptor.subscriptions.push(subscription);
    }

    static unsubscribe() {
        Subscriptor.subscriptions.forEach(subscription => subscription ? subscription.unsubscribe() : 0);
        Subscriptor.subscriptions = [];
    }
}

Использование внутри компонента следующее:

Если вы хотите подписаться на любую услугу, просто поместите подписку в установщик подписчика.

      ngOnInit(): void {
    Subscriptor.subscription = this.userService.getAll().subscribe(users => this.users = users);
    Subscriptor.subscription = this.categoryService.getAll().subscribe(categories => this.categories = categories);
    Subscriptor.subscription = this.postService.getAll().subscribe(posts => this.posts = posts);
}

Если вы хотите отказаться от подписки на какую-либо услугу, просто вызовите метод отмены подписки Subscriptor.

      ngOnDestroy(): void {
    Subscriptor.unsubscribe();
}

УтилизироватьСумка

Идея была вдохновлена ​​DisposeBag от RxSwift, поэтому я решил разработать аналогичную, но простую структуру.

DisposeBag — это структура данных, содержащая ссылку на все открытые подписки. Это упрощает удаление подписки в наших компонентах, предоставляя нам API для отслеживания состояния открытых подписок.

Преимущества

Очень простой API, делает ваш код простым и маленьким. Предоставляет API для отслеживания состояния открытых подписок (позволяет отображать неопределенный индикатор выполнения). Никаких инъекций/пакетов зависимостей.

Применение

В компоненте:

      @Component({
  selector: 'some-component',
  templateUrl: './some-component.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush
})
export class SomeComponent implements OnInit, OnDestroy {

  public bag = new DisposeBag()
  
  constructor(private _change: ChangeDetectorRef) {
  }

  ngOnInit(): void {

    // an observable that takes some time to finish such as an api call.
    const aSimpleObservable = of(0).pipe(delay(5000))

    // this identifier allows us to track the progress for this specific subscription (very useful in template)
    this.bag.subscribe("submission", aSimpleObservable, () => { 
      this._change.markForCheck() // trigger UI change
     })
  }

  ngOnDestroy(): void {
    // never forget to add this line.
    this.bag.destroy()
  }
}

В шаблоне:

      
<!-- will be shown as long as the submission subscription is open -->
<span *ngIf="bag.inProgress('submission')">submission in progress</span>

<!-- will be shown as long as there's an open subscription in the bag  -->
<span *ngIf="bag.hasInProgress">some subscriptions are still in progress</span>

Реализация

      import { Observable, Observer, Subject, Subscription, takeUntil } from "rxjs";


/**
 * This class facilitates the disposal of the subscription in our components.
 * instead of creating _unsubscribeAll and lots of boilerplates to create different variables for Subscriptions; 
 * you can just easily use subscribe(someStringIdentifier, observable, observer). then you can use bag.inProgress() with
 * the same someStringIdentifier on you html or elsewhere to determine the state of the ongoing subscription.
 *
 *  don't forget to add onDestroy() { this.bag.destroy() }
 * 
 *  Author: Hamidreza Vakilian (hvakilian1@gmail.com)
 * @export
 * @class DisposeBag
 */
export class DisposeBag {
    private _unsubscribeAll: Subject<any> = new Subject<any>();

    private subscriptions = new Map<string, Subscription>()


    /**
     * this method automatically adds takeUntil to your observable, adds it to a private map.
     * this method enables inProgress to work. don't forget to add onDestroy() { this.bag.destroy() }
     *
     * @template T
     * @param {string} id
     * @param {Observable<T>} obs
     * @param {Partial<Observer<T>>} observer
     * @return {*}  {Subscription}
     * @memberof DisposeBag
     */
    public subscribe<T>(id: string, obs: Observable<T>, observer: Partial<Observer<T>> | ((value: T) => void)): Subscription {
        if (id.isEmpty()) {
            throw new Error('disposable.subscribe is called with invalid id')
        }
        if (!obs) {
            throw new Error('disposable.subscribe is called with an invalid observable')
        }

        /* handle the observer */
        let subs: Subscription
        if (typeof observer === 'function') {
            subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)
        } else if (typeof observer === 'object') {
            subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)
        } else {
            throw new Error('disposable.subscribe is called with an invalid observer')
        }

        /* unsubscribe from the last possible subscription if in progress. */
        let possibleSubs = this.subscriptions.get(id)
        if (possibleSubs && !possibleSubs.closed) {
            console.info(`Disposebag: a subscription with id=${id} was disposed and replaced.`)
            possibleSubs.unsubscribe()
        }

        /* store the reference in the map */
        this.subscriptions.set(id, subs)

        return subs
    }


    /**
     * Returns true if any of the registered subscriptions is in progress.
     *
     * @readonly
     * @type {boolean}
     * @memberof DisposeBag
     */
    public get hasInProgress(): boolean {
        return Array.from(this.subscriptions.values()).reduce(
            (prev, current: Subscription) => { 
                return prev || !current.closed }
            , false)
    }

    /**
     * call this from your template or elsewhere to determine the state of each subscription.
     *
     * @param {string} id
     * @return {*} 
     * @memberof DisposeBag
     */
    public inProgress(id: string) {
        let possibleSubs = this.subscriptions.get(id)
        if (possibleSubs) {
            return !possibleSubs.closed
        } else {
            return false
        }
    }


    /**
     * Never forget to call this method in your onDestroy() method of your components.
     *
     * @memberof DisposeBag
     */
    public destroy() {
        this._unsubscribeAll.next(null);
        this._unsubscribeAll.complete();
    }
}

Есть 2 способа отказаться от подписки, наблюдаемой в Angular.

Один из способов: всегда лучше отказаться от подписки на observable$, на который мы подписались, используя .subscribe в компоненте.ts. Хук жизненного цикла, который срабатывает, когда мы покидаем компонент, — это ngOnDestroy(), поэтому отпишитесь от всех наших наблюдаемых внутри ngOnDestroy(), например:observable$?.unsubscribe();

второй способ: подпишитесь на наблюдаемые, используя асинхронный канал в шаблоне, это автоматически подпишется и откажется от подписки на наблюдаемые после их завершения, например:<div *ngIf="observable$ | async"> </div>

Вы можете использовать последнюю версию Subscription класс, чтобы отказаться от подписки на Observable с не таким запутанным кодом.

Мы можем сделать это с normal variable но это будет override the last subscription при каждой новой подписке, так что избегайте этого, и этот подход очень полезен, когда вы имеете дело с большим количеством Obseravables и типом Obeservables, например BehavoiurSubject а также Subject

Подписка

Представляет одноразовый ресурс, например выполнение Observable. Подписка имеет один важный метод, отказ от подписки, который не принимает аргументов и просто удаляет ресурс, удерживаемый подпиской.

вы можете использовать это двумя способами,

  • вы можете напрямую отправить подписку в массив подписок

     subscriptions:Subscription[] = [];
    
     ngOnInit(): void {
    
       this.subscription.push(this.dataService.getMessageTracker().subscribe((param: any) => {
                //...  
       }));
    
       this.subscription.push(this.dataService.getFileTracker().subscribe((param: any) => {
            //...
        }));
     }
    
     ngOnDestroy(){
        // prevent memory leak when component destroyed
        this.subscriptions.forEach(s => s.unsubscribe());
      }
    
  • с помощью add() из Subscription

    subscriptions = new Subscription();
    
    this.subscriptions.add(subscribeOne);
    this.subscriptions.add(subscribeTwo);
    
    ngOnDestroy() {
      this.subscriptions.unsubscribe();
    }
    

А Subscription может удерживать дочерние подписки и безопасно отказываться от них всех. Этот метод обрабатывает возможные ошибки (например, если какие-либо дочерние подписки являются нулевыми).

Надеюсь это поможет..:)

В моем случае я использую вариант решения, предложенного @seanwright:
https://github.com/NetanelBasal/ngx-take-until-destroy

Этот файл используется в проекте ngx-rocket / starter-kit . Вы можете получить к нему доступ здесь до-destroy.ts

Компонент будет выглядеть так

      /**
 * RxJS operator that unsubscribe from observables on destory.
 * Code forked from https://github.com/NetanelBasal/ngx-take-until-destroy
 *
 * IMPORTANT: Add the `untilDestroyed` operator as the last one to
 * prevent leaks with intermediate observables in the
 * operator chain.
 *
 * @param instance The parent Angular component or object instance.
 * @param destroyMethodName The method to hook on (default: 'ngOnDestroy').
 */
import { untilDestroyed } from '../../core/until-destroyed';

@Component({
  selector: 'app-example',
  templateUrl: './example.component.html'
})
export class ExampleComponent implements OnInit, OnDestroy {

  ngOnInit() {
    interval(1000)
        .pipe(untilDestroyed(this))
        .subscribe(val => console.log(val));

    // ...
  }


  // This method must be present, even if empty.
  ngOnDestroy() {
    // To protect you, an error will be thrown if it doesn't exist.
  }
}

Здесь много отличных ответов ...

Позвольте мне добавить еще одну альтернативу:

      import { interval    } from "rxjs";
import { takeUntil   } from "rxjs/operators";
import { Component   } from "@angular/core";
import { Destroyable } from "@bespunky/angular-zen/core";

@Component({
    selector: 'app-no-leak-demo',
    template: '👍 Destroyable component rendered. Unload me and watch me cleanup...'
})
export class NoLeakComponent extends Destroyable
{
    constructor()
    {
        super();

        this.subscribeToInterval();
    }

    private subscribeToInterval(): void
    {
        const value    = interval(1000);
        const observer = {
            next    : value => console.log(`👍 Destroyable: ${value}`),
            complete: ()    => console.log('👍 Observable completed.')
        };

        // ==== Comment one and uncomment the other to see the difference ====
        
        // Subscribe using the inherited subscribe method
         this.subscribe(value, observer);

        // ... or pipe-in the inherited destroyed subject
        //value.pipe(takeUntil(this.destroyed)).subscribe(observer);
    }
}

Живой пример

Что тут происходит

Компонент / услуга расширяется Destroyable (который поступает из библиотеки под названием <tcode id="4150255"></tcode>).

Теперь класс может просто использовать this.subscribe() или же takeUntil(this.destroyed) без дополнительного стандартного кода.

Для установки библиотеки используйте:
> npm install @bespunky/angular-zen

--- Обновление решения Angular 9 и Rxjs 6

  1. С помощью unsubscribe в ngDestroy жизненный цикл Angular Component
class SampleComponent implements OnInit, OnDestroy {
  private subscriptions: Subscription;
  private sampleObservable$: Observable<any>;

  constructor () {}

  ngOnInit(){
    this.subscriptions = this.sampleObservable$.subscribe( ... );
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}
  1. С помощью takeUntil в Rxjs
class SampleComponent implements OnInit, OnDestroy {
  private unsubscribe$: new Subject<void>;
  private sampleObservable$: Observable<any>;

  constructor () {}

  ngOnInit(){
    this.subscriptions = this.sampleObservable$
    .pipe(takeUntil(this.unsubscribe$))
    .subscribe( ... );
  }

  ngOnDestroy() {
    this.unsubscribe$.next();
    this.unsubscribe$.complete();
  }
}
  1. для какого-то действия, которое вы вызываете в ngOnInit это происходит только один раз, когда компонент init.
class SampleComponent implements OnInit {

  private sampleObservable$: Observable<any>;

  constructor () {}

  ngOnInit(){
    this.subscriptions = this.sampleObservable$
    .pipe(take(1))
    .subscribe( ... );
  }
}

У нас также есть asyncтруба. Но это одно использование в шаблоне (не в компоненте Angular).

Другие вопросы по тегам