Reactive Extensions Сергей Тепляков STeplyakov@luxoft.com.

Презентация:



Advertisements
Похожие презентации
ИТЕРАТОРЫ И LINQ Лекция 1. Интерфейс IEnumerable и IEnumerator Любая коллекция реализует интерфейс IEnumerable. public interface IEnumerable : IEnumerable.
Advertisements

ДЕЛЕГАТЫ Лекция 7 1. Зачем нужны делегаты 2 И данные, и код располагаются в памяти компьютера по определенным адресам. Передача адресов данных в C# происходит.
Язык программирования C# Дмитрий Сошников
Перегрузка операторов x = a + b результат 1-й операнд2-й операнд оператор По количеству операндов операторы делятся на: унарные (один операнд) бинарные.
Высокоуровневые методы информатики и программирования Лекция 9 Делегаты.
©Павловская Т.А. (СПбГУ ИТМО) Курс «С#. Программирование на языке высокого уровня» Павловская Т.А.
C# 5.0 Взгляд в будущее Язык формирует наш способ мышления и определяет то, о чем мы можем мыслить. Б. Л. Ворф Специально для TulaDev.NET.
Делегаты Делегат эти объект, который безопасно инкапсулирует метод, его действие схоже с указателем функции в C и C++. Делегаты используются для передачи.
СОБЫТИЯ Лекция 1. Взаимодействие объектов 2 Взаимодействие между объектами A и B можно организовать двумя способами. 1.Объект A вызывает метод объекта.
Обобщения ( generics) Обобщения – это классы, структуры, интерфейсы и методы, в которых некоторые типы сами являются параметрами. Эти типы перечисляются.
Гайдар Магдануров Microsoft.
Источники данных LINQ РУБД. LINQ (Language Integrated Query, язык интегрированных запросов ) это технология, которая позволяет разработчикам формировать.
Гайдар Магдануров Microsoft.
Михаил Черномордиков Developer Evangelist, Microsoft Россия
Msdevcon.ru#msdevcon. Работа с асинхронными операциями в Win RT Иван Бодягин ABBYY.
В С# предусмотрены средства для создания пользовательских классов-контейнеров, к внутренним элементам которых можно обращаться при помощи того же оператора.
Высокоуровневые методы информатики и программирования Лекция 10 События.
Многопоточное программирование ( часть 1) Лекция 10.
Лекция 1 Классификация С++. Парадигмы программирования Императивная Функциональная Декларативная (логическая) Инструкция 1 Инструкция 2 Инструкция 3 Инструкция.
ФИЛОСОФИЯ.NET Любому современному программисту, который желает идти в ногу с последними веяниями, каждые несколько лет приходится переучиваться.
Транксрипт:

Reactive Extensions Сергей Тепляков

О Вашем инструкторе Сергей Тепляков Visual C# MVP, RSDN Team member SergeyTeplyakov.blogspot.com 1-2

Цели курса… Философию библиотеки Reactive Extensions Основы библиотеки TPL Использование библиотеки TPL Новые возможности C# 5 Слушатели изучат: 1-3

Необходимая подготовка Быть знакомы с основами языка C# и платформы.Net Обладать базовыми знаниями многопоточности Быть знакомы с LINQ (Language Integrated Query) Слушатели должны: 1-4

Roadmap Введение в реактивные расширения Дуализм интерфейсов Основы Rx Observable sequences Events и Observables Observables и асинхронные операции Concurrency Новости из Редмонта 1-5

Интерактивная и реактивная модель

Реактивное программирование Парадигма программирования, ориентированная на потоки данных и распространение изменений. Это означает, что должна существовать возможность легко выражать статические и динамические потоки данных, а также то, что выполняемая модель должна автоматически распространять изменения сквозь поток данных. /wiki/Реактивное_программирование

Реактивная модель "Принцип Голливуда" – "Не звоните нам, мы сами вам позвоним" (Hollywood Principle - Don't call us, we'll call you)

Rx - это 1. набор классов, представляющих собой асинхронный поток данных 2. набор операторов, предназначенных для манипуляции этими данными 3. набор классов для управления многопоточностью Rx = Observables + LINQ + Schedulers

Зачем все это нужно? Social media Stock tickers RSS feeds GPS Server management UI events

Интерфейс IEnumerable Pull-based последовательности представлены интерфейсом IEnumerable. Это - коллекции в памяти (списки, вектора и т.д.) бесконечные последовательности (генераторы) xml-данные результаты запроса к БД

Интерфейс IEnumerable public interface IEnumerable { IEnumerator GetEnumerator(); } public interface IEnumerator : IDisposable { bool MoveNext(); T Current { get; } void Reset(); } Ковариантность из.NET 4.0 Блокирующая операция

Упрощение интерфейса IEnumerator public interface IEnumerator : IDisposable { (T | void | Exception) GetNext(); } T – следующий элемент void – окончание последовательности Exception – ошибка

Интерфейс IObservable public interface IObservable { IDisposable Subscribe(IObserver observer); } public interface IObserver { void OnNext(T value); void OnError(Exception error); void OnCompleted(); }

Pull vs Push

Простые последовательности Observable.Empty (); Observable.Return(42); Observable.Throw (ex); Observable.Never (); OnComplete OnNext OnError new int[] {}; new int[] {42}; Итератор, генерирующий исключение Итератор, который не возвращает управление

Методы Range Observable.Range(0, 3); OnNext(0)OnNext(1)OnNext(2) Enumerable.Range(0, 3); yield 0yield 1yield 2

Циклы for var xs = Observable.Generate( 0, x => x x + 1, x => x); xs.Subscribe(x => { Console.WriteLine(x); }); var xs = new IEnumerable { for(int i = 0; i < 10; i++) yield return i; }; foreach(var x in xs) { Console.WriteLine(x); } Предположим, у нас есть синтаксис анонимных итераторов

Контракт «реактивных» последовательностей Grammar: OnNext* [OnCompleted | OnError] Методы наблюдателя вызываются последовательно

Упрощение работы с интерфейсом IObserver var xs = Observable.Range(0, 10); 1. Только OnNext: xs.Subscribe(/*OnNext(int)*/x => Console.WriteLine(x)); 2. OnNext и OnCompleted xs.Subscribe( /*OnNext(int)*/x => Console.WriteLine(x), /*OnComplete*/() => Console.WriteLine("Completed")); 3. OnNext и OnError xs.Subscribe( /*OnNext(int)*/x => Console.WriteLine(x), /*OnError(Exception)*/e => Console.WriteLine("Error: {0}", e)); 4. OnNext, OnError и OnCompleted xs.Subscribe( /*OnNext(int)*/x => Console.WriteLine(x), /*OnError(Exception)*/e => Console.WriteLine("Error: {0}", e), /*OnCompleted*/() => Console.WriteLine("Completed"));

Demo

События в.NET Объявление event Action E; Публикация E(42); Подписка E += x => Console.WriteLine(x);

Rx Объявление ISubject S = new Subject (); Публикация S.OnNext(42); Подписка S.Subscribe(x => Console.WriteLine(x));

class Program { event Action E; static void Main() { var p = new Program(); p.E += x => Console.WriteLine(x); p.E(1); p.E(2); p.E(3); } Events vs Observables class Program { ISubject S = new Subject (); static void Main() { var p = new Program(); p.S.Subscribe(x => Console.WriteLine(x)); p.S.OnNext(1); p.S.OnNext(2); p.S.OnNext(3); }

Отписка от событий Events static event Action E; //E += x => Console.WriteLine(x); Action action = x => Console.WriteLine(x); E += action; E -= action; Observables static ISubject S = new Subject (); var token = S.Subscribe(x => Console.WriteLine(x)); token.Dispose();

«События» первого класса Объект называют «объектом первого класса» когда он: может быть сохранен в переменной может быть передан в функцию как параметр может быть возвращен из функции как результат может быть создан во время выполнения программы внутренне самоидентифицируем (независим от именования)

«События» первого класса // Сохранение в переменной IObservable textChanged = …; // Передача в качестве параметра void ProcessRequests(IObservable input) {…} // Возвращение в качестве результата IObservable QueryServer() {…}

Возможности наблюдаемых последовательностей IObservable mouseMoves = from e in Observable.FromEventPattern (frm, "MouseMove") select e.EventArgs.Location; var filtered = mouseMoves.Where(p => p.X == p.Y); var subscription = filtered.Subscribe(p => Console.WriteLine("X, Y = {0}", p.X)); subscription.Dispose(); Преобразование события в поток объектов Point Фильтрация «событий» Обработка «событий» Отписка от «событий»

Demo

Асинхронные операции Classical Async Pattern FileStream fs = File.OpenRead("data.txt"); byte[] buffer = new byte[1024]; fs.BeginRead(buffer, 0, buffer.Length, ar => { int bytesRead = fs.EndRead(ar); // Обработка данных в массиве buffer }, null); Невозможность использования привычных языковых конструкций (using, try/finally etc) «Вывернутый» поток исполнения Сложность обработки ошибок, а также чтения и сопровождения кода

FromAsyncPattern static int LongRunningFunc(string s) { Thread.Sleep(TimeSpan.FromSeconds(5)); return s.Length; } Func longRunningFunc = LongRunningFunc; Func > funcHelper = Observable.FromAsyncPattern ( longRunningFunc.BeginInvoke, longRunningFunc.EndInvoke); IObservable xs = funcHelper("Hello, String"); xs.Subscribe(x => Console.WriteLine("Length is " + x)); FromAsyncPattern преобразует возвращаемое значение метода в IObservervable

Tasks vs FromAsyncPattern Задачи (Tasks) – это унифицированный способ представления Single value asynchrony в.Net Framework Observables – Multi value asynchrony Существует простой способ преобразования Tasks -> Observables

Task -> ToObservable static class FuncExtensions { internal static Task ToTask(this Func func, string s) { return Task.Factory.FromAsync( func.BeginInvoke, func.EndInvoke, s, null); } } Func longRunningFunc = LongRunningFunc; string s = "Hello, String"; IObservable xs = longRunningFunc.ToTask(s).ToObservable(); xs.Subscribe(x => Console.WriteLine("Length is " + x), () => Console.WriteLine("Task is finished")); } } Метод расширения можно написать один раз, а использовать повсюду, а не только с Rx Это более предпочтительный способ, поскольку метод FromAsyncPattern скоро будет устаревшим

Чем Rx не является Rx не заменяет существующей «асинхронности»:.NET все еще незаменимы Асинхронные методы все так же применимы в библиотеках Задачи представляют собой single value asynchrony Существуют и другие асинхронные источники, как SSIS, PowerShell, etc Но rx… Унифицирует работу Предоставляет возможность композиции Предоставляет обобщенные операторы Так что rx – это … Мост!

Demo

Управление многопоточность Управление многопоточностью осуществляется с помощью интерфейса IScheduler var xs = Observable.Range(0, 10, Scheduler.ThreadPool); xs.Subscribe(x => Console.WriteLine(x)); Будет исполняться в контексте планировщика Параметризация с помощью IScheduler доступна в большинстве операций

Синхронизация Обновление пользовательского интерфейса Label lbl = new Label(); Form frm = new Form() {Controls = {lbl}}; var xs = Observable.Return(42, Scheduler.ThreadPool); xs.Subscribe(x => { Thread.Sleep(1000); lbl.Text = "Result is " + x; }); Использование ObserveOn xs.ObserveOn(frm).Subscribe(x => { Thread.Sleep(1000); lbl.Text = "Result is " + x; }); IScheduler поддерживает разные контексты синхронизации

Demo

Что мы изучили? Введение в реактивные расширения Дуализм интерфейсов Основы Rx Observable sequences Events и Observables Observables и асинхронные операции Concurrency 1-40

Experimental vs Stable Две версии библиотеки Reactive Extensions: Stable Experimental Interactive Extensions

Где взять? NuGet Web - us/data/gg577610http://msdn.microsoft.com/en- us/data/gg Cant find? Use google

Дополнительные ссылки The Reactive Extensions (Rx)... (Data Developer Center) Reactive Extensions (MSDN) - us/library/hh242985(v=vs.103).aspxhttp://msdn.microsoft.com/en- us/library/hh242985(v=vs.103).aspx Rx Team Blog - Реактивные расширения и асинхронные операции -

Roslyn CTP is awailable!

Вопросы?

Горячие и холодные последовательности