====== Programación reactiva ===== La programación reactiva es un paradigma de programación por el que se ejecutan una serie de eventos sobre un conjunto de datos siguiendo el patrón de diseño Observer. De esa manera, son los propios objetos quienes notifican de los cambios producidos sobre si mismos para que se ejecuten, de forma asíncrona, los eventos asociados a éstos. Ya en el propio JDK podemos encontrar una API para programación reactiva (clase Flow) que es el reemplazo natural para las clases Observer y Observable que antes nos permitían implementar el patrón de diseño Observer (pero que ya han sido desaconsejadas). Estos flujos de datos sobre los que se trabajan, ahora conocidos como Flows, podríamos compararlos con los Stream que aparecen en Java 8. Pero los primeros presentan algunas diferencias con los segundos que los hacen especialmente apropiados para su uso en programación reactiva. Con los Stream podemos procesar flujos de datos (incluso de forma paralela) y con un Flow podemos hacerlo de forma asíncrona. Es por eso que muchas veces nos referiremos a Flow como una API de Streams Reactivos. Aunque Java dispone de una API para la programación reactiva, nos centraremos en la utilización de la librería [[https://github.com/ReactiveX/RxJava|RxJava]] ya que su uso está bastante extendido actualmente. ---- ===== Introducción a la programación reactiva ===== ==== Patrón Observer ==== El patrón //Observer// es un patrón de diseño de software por el cual un objeto, llamada sujeto es observado por otros objetos llamados observadores a los cuales notifica cualquier cambio que sobre él se produce, normalmente invocando a alguno de sus métodos. El API de Java incluye las clases necesarias para implementar este patrón, en el paquete ''java.beans'', con la clase //PropertyChangeSupport// y la interface //PropertyChangeListener// para definir al objeto observado y a los objetos observadores, respectivamente. En el siguiente ejemplo se puede ver la implementación del patrón //Observer//. Tenemos dos clases, //Product// y //Customer//. La primera es el sujeto y la segunda es el observador de forma que cada vez que el producto baje de precio (método ''decreasePrice(float)'') el cliente será notificado (mediante la llamada a ''firePropertyChange()'' que invoca el método ''propertyChange()'' en el cliente). /** * Clase Product * Es la clase Observable. Los cambios que se produzcan serán * observados inmediatamente por las clases Observer que se registren * desde ésta */ public class Product { private String name; private String description; private float price; private int stock; private PropertyChangeSupport support; public Product(String name, String description, float price) { this.name = name; this.description = description; this.price = price; stock = 0; support = new PropertyChangeSupport(this); } public void addObserver(PropertyChangeListener observer) { support.addPropertyChangeListener(observer); } public void removeObserver(PropertyChangeListener observer) { support.removePropertyChangeListener(observer); } public void decreasePrice(float value) { support.firePropertyChange("price", price, (price - value)); price -= value; } } /** * Clase Customer * En este caso es la clase que observa (Observer) * Observa los cambios que se produzcan en otra clase, a la que se conoce como * Observable */ public class Customer implements PropertyChangeListener { @Override public void propertyChange(PropertyChangeEvent event) { float newPrice = (float) event.getNewValue(); float oldPrice = (float) event.getOldValue(); System.out.println("El precio ha bajado " + (oldPrice - newPrice) + " €"); } } public class Application { public static void main(String[] args) { Product product = new Product("Donuts", "Donuts de chocolate", 10); Customer customer = new Customer(); product.addObserver(customer); product.decreasePrice(2); } } En el siguiente video se puede ver la implementación de este patrón usando la clase //Observable// y la interface //Observer// de Java. Hay que tener en cuenta que ambas fueron desaconsejadas a partir de Java 9 por lo que es un ejemplo que habría que comenzar a olvidar. Java recomienda, entre otras, la implementación que acabamos de ver en el ejemplo anterior. {{ youtube>saIIwBviGZw }} \\ ==== ObservableList de JavaFX ==== Otro ejemplo de patrón Observer lo tenemos en la clase ''ObservableList'' de la librería JavaFX. Es una clase que funciona como colección de objetos y que además soporta ''Listeners'' que puede hacer de observadores ante cualquier cambio que ocurra en la lista (añadir, modificar o eliminar un elemento). Para ver cómo funciona necesitamos añadir JavaFX como dependencia en nuestro ''pom.xml''. También incluiremos ''lombok'' para simplificar un poco el desarrollo de las clases del modelo de datos. org.projectlombok lombok 1.18.18 org.openjfx javafx-controls 14 Vamos a definir dos clases. Una formará parte de la colección observable (Movie) y la otra (Cinema) será quien observará los cambios sobre la colección. @Data @Builder @ToString @EqualsAndHashCode public class Movie { private String title; @EqualsAndHashCode.Exclude private String summary; @EqualsAndHashCode.Exclude private int duration; } La clase ''Cinema'' será la observadora y por eso implementa la interfaces ''ListChangeListener'' @Data @Builder @ToString @EqualsAndHashCode public class Cinema implements ListChangeListener { private String name; @EqualsAndHashCode.Exclude private String address; @Override public void onChanged(Change change) { while (change.next()) { if (change.wasRemoved()) { System.out.println("Se ha eliminado una pelicula: " + change.getRemoved().get(0)); } else if (change.wasAdded()) { System.out.println("Se estrena una nueva pelicula: " + change.getAddedSubList().get(0)); } } } } En la clase principal se ve cómo creamos la colección ''ObservableList'', añadimos al observador con el método ''addLister'' y realizamos algunos cambios. Al ejecutarlo, la clase ''Cinema'' observará dichos cambios ejecutándose el bloque correspondiente al cambio producido en el método ''onChange'' de esa clase. public class Application { public static void main(String[] args) { Movie movie = Movie.builder() .title("Rocky") .summary("Pelicula de boxeo") .duration(120) .build(); Cinema cinema = Cinema.builder() .name("Cinema1") .build(); ObservableList movies = FXCollections.observableArrayList(movie); movies.addListener(cinema); movies.remove(0); movies.add(Movie.builder().title("Terminator 2").build()); } } La salida por pantalla que muestra este ejemplo será algo parecido a la siguiente: . . . Se ha eliminado una pelicula: Movie(title=Rocky, summary=Pelicula de boxeo, duration=120) Se estrena una nueva pelicula: Movie(title=Terminator 2, summary=null, duration=0) ==== Streams ==== Los Streams son clases que permiten encapsular colecciones o arrays de forma que podamos ejecutar una serie de operaciones sobre ellos utilizando expresiones lambda. Simplifican la operación con los elementos puesto que podemos encandenar varias lambdas y recuperar el resultado final en otra colección distinta de la original. Se puede crear de diferentes formas: * A partir de colecciones Java: List animals = new ArrayList<>(); Stream streamAnimals = animales.stream(); * A partir de literales u objetos: Stream names = Stream.of("Diana", "Maria", "Trinidad"); * A partir de un array: String[] names = {"Diana", "Maria", "Trinidad"}; Stream streamNames = Stream.of(nombres); Existen 2 tipos de operaciones sobre los Streams: === Operaciones intermedias === Son las operaciones que siempre acaban devolviendo el stream modificado * **distinct()**: Devuelve un Stream con los elementos no repetidos * **filter(Predicate predicado)**: Devuelve un Stream con los elementos que pasan el filtro * **limit(long tamanoMaximo)**: Devuelve un Stream con el número de elementos que se pasa como parámetro * **map(Function mapeador)**: Devuelve un Stream con la versión mapeada de cada elemento * **peek(Consumer accion)**: Devuelve el Stream tras ejecutar la acción que se pasa como parámetro (Existe por motivos de depueración, para poder ver cómo están dispuestos los elementos en el Stream en un momento dado) * **skip(long n)**: Elimina los n primeros elementos y devuelve el Stream resultante * **sorted(Comparator comparador)**: Devuelve el Stream ordenador por el Comparator que se pasa como parámetro * **unordered()**: Devuelve un Stream desordenado === Operaciones finales === Son las operaciones que devuelven como resultado algo diferente que el propio Stream * **boolean allMatch(Predicate predicado)**: Comprueba si todos los elementos del Stream cumplen el predicado * **boolean anyMatch(Predicate predicado)**: Comprueba si algún elemento del Stream cumple el predicado * **boolean noneMatch(Predicate predicado)**: Comprueba si ningún elemento del Stream cumple el predicado * **Optional findAny()**: Devuelve un elemento cualquier del Stream * **Optional findFirst()**: Devuelve el primer elemento del Stream * **R collect(Collector colector)**: Devuelve el Stream como un tipo colección * **long count()**: Devuelve el número de elementos del Stream * **void forEach(Consumer accion)**: Ejecuta una operación sobre cada elemento del Stream * **void forEachOrdered(Consumer accion)**: Ejecuta una operación sobre cada elemento del Stream ordenado * **Optional max(Comparator comparador)**: Devuelve el mayor de los elementos según el Comparador * **Optional min(Comparator comparador)**: Develve el menor de los elementos según el Comparador * **Object[] toArray()**: Devuelve el Stream como un array de Objects * **Iterator iterator()**: Devuelve el Stream como un Iterator ==== Operaciones asíncronas en Java: Future y CompletableFuture ==== La primera aproximación a la posibilidad de ejecución de tareas asíncronas en Java fue la clase //Future//, que aparece con Java 5. Permite la ejecución de una tarea de forma asíncrona (aunque luego la operación que permite recoger el resultado de esa tarea es bloqueante). También existe, desde Java 8, //CompletableFuture// que permite la ejecución de tareas asíncronas de una forma más completa. === Future === Future es una clase que permite crear objetos que pueden ser asignados a un hilo para ejecutarse en segundo plano. Además, estas tareas pueden devolver un resultado al terminar y lanzar excepciones si es oportuno. La ejecución de la tarea asignada al //Future// se hace de forma asíncrona, de forma que el programa continúa mientras ésta se ejecuta hasta que recuperemos el valor que devuelve invocando al método //get()// Por un lado se define la tarea: public class TareaCallable implements Callable { @Override public Boolean call() throws Exception { // Hace algo } } Y por otro lado el //Future// que la lanzará y permitirá recoger el resultado: ExecutorService executor = Executors.newFixedThreadPool(2); ... Future respuesta = executor.submit(new TareaCallable()); // El programa sigue funcionando // y la respuesta de la tarea estará en 'respuesta' cuando ésta esté lista Boolean resultado = respuesta.get(); // Podemos forzar la espera (y bloqueo) de la respuesta // El método get() hace que el hilo espere a que termine la tarea executor.shutdown(); También es posible cancelar la tarea una vez lanzada: * La clase //Future// dispone de un método //isDone()// que indica si la tarea ha terminado * El método //cancel(boolean)// permite cancelarla y //isCancelled()// permite comprobar si terminó por haber sido cancelada (puede haberlo hecho de forma natural) * El método //get()// también admite un timeout para definir el tiempo de espera limite que la aplicación está dispuesta a esperar por la respuesta del //Future//. Si se sobrepasa se lanza una excepción //TimeoutException// ExecutorService executor = Executors.newFixedThreadPool(2); ... Future respuesta = executor.submit(new TareaCallable()); while (futuro.isDone()) { // Podemos ir haciendo algo mientras no termina la tarea if (tiempoPasado > XXXX) // Podemos cancelar la tarea pasado un tiempo respuesta.cancel(true); } if (respuesta.isCancelled()) { // La tarea ha terminado porque ha sido cancelada, podemos hacer algo al respecto } else { // La tarea ha terminado correctamente, podemos recoger su resultado y hacer algo Boolean resultado = respuesta.get(); ... } executor.shutdown(); === CompletableFuture === Proporciona una API más completa que la que hay para la clase //Future//. * Lanzar una tarea en segundo plano de forma asíncrona y ejecutar un método al finalizar ésta (a través de la llamada a un método callback) CompletableFuture.runAsync(() -> doSomething()) .whenComplete((string, throwable) -> doSomethingWhenFinishFuture()); * Lanzar una tarea y ejecutar algo cuando ésta termina: private String doSomethingAndReturnResult() { return "ok"; } CompletableFuture.supplyAsync(() -> doSomethingAndReturnResult()) .thenAccept(System.out::println) .whenComplete((nothing, error) -> System.out.println("Fin")); ---- ===== RxJava ===== [[https://github.com/ReactiveX/RxJava|RxJava]] es una librería para programación asíncrona de eventos y secuencias observables de elementos. Tomando como punto de partida el [[https://psp.codeandcoke.com/apuntes:reactiva#patron_observer|patrón Observer]] que hemos visto anteriormente, permite gestionar secuencias de eventos y operaciones sobre éstos de forma que el programador pueda abstraerse de temas como la gestión de los hilos a bajo nivel, la sincronización de observador y observador y algunas otras consideraciones. [[https://github.com/Froussios/Intro-To-RxJava|Intro To RxJava]] es una guía muy interesante sobre las características que ofrece RxJava con ejemplos muy sencillos. Es algo antigua pero todavía sirve. ==== Introducción a RxJava ==== Hay que tener en cuenta que la librería de programación reactiva RxJava se fundamenta en dos tipos de datos principalmente: * **Observable**: Es una colección de datos que emite elementos que son consumidos por los Observadores * **Observer**: Es un objeto que se suscribe a un Observable para consumir los elementos que éste emite Cuando un ''Observer'' comienza a consumir los items emitidos por un ''Observable'' se dice que se ha suscrito a él. Asi, las acciones que implementemos asociadas a la suscripción van orientadas a consumir los elementos que el ''Observable'' emite: * **doOnNext**: Es el paso que permite consumir un item determinado. También es la acción que se asocia directamente a la llamada del método ''subscribe'' * **doOnError**: Qué hacer en caso de error * **doOnComplete**: Qué hacer cuando se terminan los items del ''Observable'' Puesto que utilizaremos la versión 3 de RxJava, tendremos que incluir la dependencia correspondiente en nuestro fichero ''pom.xml'' io.reactivex.rxjava3 rxjava 3.0.10 Ejemplo de cómo crear una secuencia de valores y consumirla como colección ''Observable''. En el momento de suscribirnos al observable, se implementa qué hacer para cada una de las acciones asociadas: ''doOnNext'', ''doOnError'' y ''doOnComplete''. En este caso nos limitamos a mostrar los elementos uno a uno: // Una secuencia de valores Observable values = Observable.just("Una", "Dos", "Tres"); values.subscribe( value -> System.out.println("Value: " + value), error -> System.out.println("Error: " + error.getMessage()), () -> System.out.println("The end!") ); Mismo ejemplo que el anterior pero esta vez invocamos a los métodos de forma más clara: Observable.just("Una", "Dos", "Tres") .doOnComplete(() -> System.out.println("The end!")) .doOnError(error -> System.out.println("Error: " + error.getMessage())) .subscribe(value -> System.out.println("Value: " + value)); En este caso tenemos dos ''Observer'' suscritos al mismo ''Observable''. Cada uno para lo que considere con los items que consuma: // Dos suscriptores se suscriben a la misma secuencia Observable values = Observable.just("Una", "Dos", "Tres") .doOnComplete(() -> System.out.println("The end!")) .doOnError(error -> System.out.println("Error: " + error.getMessage())); values.subscribe(value -> System.out.println("Value (observador 1): " + value)); values.subscribe(value -> System.out.println("Value (observador 2): " + value)); En el siguiente ejemplo se puede ver como los items de un ''Observable'' se crean, por defecto, en el momento en que éste es creado. Se puede comprobar que ambos ''Observer'' consumen el mismo elemento: Observable moreValues = Observable.just(System.currentTimeMillis()); moreValues.subscribe(System.out::println); try { Thread.sleep(1000); } catch (InterruptedException ie) { ie.printStackTrace(); } moreValues.subscribe(System.out::println); Por el contrario, en el siguiente ejemplo, haciendo uso del método ''defer()'' hacemos que la secuencia de elementos se genere en el momento en el que cada ''Observer'' comienza a consumirlos: Observable moreMoreValues = Observable.defer(() -> Observable.just(System.currentTimeMillis())); moreMoreValues.subscribe(System.out::println); try { Thread.sleep(1000); } catch (InterruptedException ie) { ie.printStackTrace(); } moreMoreValues.subscribe(System.out::println); En este ejemplo podemos ver cómo crear un Observable a partir de una colección (y, por tanto, también de un array): List books = new ArrayList<>(); books.add(new Book("Secuestrado", "Libro de aventuras", 500, "Aventuras", LocalDate.now())); books.add(new Book("Tom Sawyer", "Libro", 450, "Aventuras", LocalDate.now())); Observable bookObservable = Observable.fromArray(books.toArray(new Book[0])); bookObservable.subscribe(System.out::println); Tal y como hacemos con los Streams, vemos con el siguiente ejemplo que podemos establecer filtros para suscribirnos solamente a aquellos elementos que nos interesen: bookObservable.filter(book -> book.getPageCount() > 450) .subscribe(System.out::println); Y vemos en el siguiente como también podemos realizar otro tipo de operaciones propias de los Streams: bookObservable.map(Book::getTitle) .subscribe(System.out::println); Y también podemos concatenar los secuencias de elementos (dos ''Observable'') para suscribirnos al resultado final: Observable range1 = Observable.range(1, 10); Observable range2 = Observable.range(11, 10); Observable.concat(range1, range2) .subscribe(System.out::println); En este último ejemplo, nos suscribimos utilizando otro hilo y, por tanto, el consumo de los elementos del Observable se realizará en segundo plano (RxJava no es concurrente por defecto): bookObservable.map(Book::getTitle) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(System.out::println); System.out.println("¿Esto cuándo se ejecuta?"); ==== Programación reactiva con RxJava ==== Una vez que hemos visto una introducción a la programación reactiva con RxJava, vamos a ver un ejemplo real en el que invocamos a una API remota para solicitar una serie de datos y cómo podemos consumir éstos de forma reactiva utilizando esta librería. Aprovecharemos también y veremos como funciona [[https://square.github.io/retrofit/|Retrofit]] que es una librería que permite mapear directamente el JSON de respuesta de la API a nuestro modelo de clases Java. Así, solicitaremos la lista de ciudades a la API [[https://restcountries.eu/|REST Countries]], ésta nos devolverá una lista de ciudades (ya mapeada como clases Java gracias a Retrofit) y nosotros procesaremos dicha lista como una secuencia con RxJava. De esa manera, podremos procesar ciudad a ciudad haciendo uso de programación reactiva. === Proyecto de ejemplo: Aplicación que consume una API con RetroFit. Sin RxJava === El objetivo de este primer ejemplo es conocer cómo funciona la librería sin hacer uso de la programación reactiva. Incluimos las dependencias en el ''pom.xml'' de nuestro proyecto: . . . com.squareup.retrofit2 retrofit 2.7.0 com.squareup.retrofit2 converter-gson 2.3.0 . . . Definimos una clase con aquellos atributos que queremos mapear de entre todos los que la API devuelve. Hay que tener en cuenta que solamente se mapearán aquellos campos que coincidan en nombre con los atributos y que se hayan definido en la clase. El resto se omitirán: /** * Modelo de objeto que devuelve la API (https://restcountries.eu/) * @author Santiago Faci * @version curso 2020-2021 */ public class Country { private String name; private String capital; private String region; private String subregion; @Override public String toString() { return name + " [" + capital + "]"; } } Y generamos la interfaz con aquellas operaciones de la API en las que estamos interesados. En este caso nos interesa el endpoint ''/rest/v2/all'' que devuelve la lista completa de paises y el endpoint ''/rest/v2/name/{name}'' que devuelve información detallada sobre un determinado país: /** * Interfaz de la API (https://restcountries.eu/) * @author Santiago Faci * @version curso 2020-2021 */ public interface CountriesApiService { @GET("/rest/v2/all") Call> getAllCountries(); // Cuidado! Aunque la respuesta en un solo país viene en forma de array @GET("/rest/v2/name/{name}") Call> getCountry(@Path("name") String name); } En la clase ''CountriesService'' inicializamos la librería Retrofit en el constructor e implementamos las operaciones que hemos definido en la interface ''CountriesApiService'': /** * Service con las operaciones disponibles * @author Santiago Faci * @version curso 2020-2021 */ public class CountriesService { private CountriesApiService api; public CountriesService() { Retrofit retrofit = new Retrofit.Builder() .baseUrl(URL) .addConverterFactory(GsonConverterFactory.create()) .build(); api = retrofit.create(CountriesApiService.class); } public List getAllCountries() { Call> allCountriesCall = api.getAllCountries(); try { Response> response = allCountriesCall.execute(); return response.body(); } catch (IOException ioe) { ioe.printStackTrace(); } return null; } // TODO Implementar la operación para obtener la información de un país } En la clase ''Constants'' definiremos las constantes de aplicación que necesitemos. Por el momento nos basta con la URL base de la API. /** * Constantes * @author Santiago Faci * @version curso 2020-2021 */ public class Constants { public final static String URL = "https://restcountries.eu/"; } Y por último, a modo de ejemplo, invocamos a la operación implementada para observar cómo nos devuelve la información mapeada en una colección Java: /** * Clase principal * @author Santiago Faci * @version curso 2020-2021 */ public class Application { public static void main(String[] args) { CountriesService countriesService = new CountriesService(); List countries = countriesService.getAllCountries(); System.out.println(countries.toString()); } } === Proyecto de ejemplo: Aplicación que consume una API con Retrofit. Con RxJava === En este caso, se trata de desarrollar el mismo ejemplo que en el caso anterior pero utilizando programación reactiva con RxJava. Para ello, crearemos un nuevo proyecto: Para comenzar, incluiremos las librerías RxJava y Retrofit en nuestro fichero ''pom.xml''. . . . io.reactivex.rxjava3 rxjava 3.0.10 com.squareup.retrofit2 retrofit 2.7.0 com.squareup.retrofit2 converter-gson 2.3.0 com.squareup.retrofit2 adapter-rxjava 2.3.0 . . . Definiremos de forma similar la clase ''Country'' que será nuestro modelo de datos Java: public class Country { . . . } Las operaciones que definamos en el interface ''CountriesApiService'' ahora ya "saben" devolver ''Observable'' en lugar de una colección Java tal cual: /** * Interfaz de la API para emplear con RetroFit (https://restcountries.eu) * @author Santiago Faci * @version Curso 2020-2021 */ public interface CountriesApiService { @GET("/rest/v2/all") Observable> getAllCountries(); // Cuidado! Aunque la respuesta en un solo país viene en forma de array @GET("/rest/v2/name/{name}") Observable> getCountry(@Path("name") String name); } De forma muy similar al primer caso, implementamos ambas operaciones devolviendo como resultado, en ambos casos, un Observable. En el caso en el que obtenemos la información de un solo país, ésta viene en forma de array (de un solo elemento) por lo que también tenemos que recogerlo como tal: /** * Service para consulta de la API * @author Santiago Faci * @version Curso 2020-2021 */ public class CountriesService { private CountriesApiService api; public CountriesService() { Retrofit retrofit = new Retrofit.Builder() .baseUrl(URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); api = retrofit.create(CountriesApiService.class); } public Observable> getAllCountries() { return api.getAllCountries(); } public Observable> getCountry(String name) { return api.getCountry(name); } } Y, finalmente, cuando la ponemos en marcha, suscribimos un ''Observer'' al ''Observable'' que devuelven ambas operaciones de forma que podemos ir procesando elemento a elemento directamente. Se puede ver especialmente claro en el caso de la operación que devuelve la lista completa de países: public class Constants { . . . } /** * Clase principal * @author Santiago Faci * @version curso 2020-2021 */ public class Application { public static void main(String[] args) { final CountriesService countriesService = new CountriesService(); System.out.println("Comenzando descarga . . ."); countriesService.getAllCountries() .flatMap(Observable::from) .doOnCompleted(() -> System.out.println("Listado de países descargado")) .doOnError(throwable -> System.out.println(throwable.getMessage())) .subscribeOn(Schedulers.from(Executors.newCachedThreadPool())) .subscribe(System.out::println); countriesService.getCountry("spain") .flatMap(Observable::from) .doOnCompleted(() -> System.out.println("Cargada información de spain")) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(System.out::println); System.out.println("Fin?"); } } ---- ===== WebFlux ===== ==== Introducción a WebFlux ==== ==== Programación reactiva con WebFlux ==== @Data @NoArgsConstructor @Document(value = "products") public class Product { @Id private String id; @Field private String name; @Field private String description; @Field private String category; @Field private float price; @Field(name = "creation_date") private LocalDateTime creationDate; } @Repository public interface ProductRepository extends ReactiveMongoRepository { Flux findAll(); Mono findByName(String name); } @Service public class ProductServiceImpl implements ProductService { @Autowired private ProductRepository productRepository; @Override public Flux findAllProducts() { return productRepository.findAll(); } @Override public Mono findProductByName(String name) { return productRepository.findByName(name); } @Override public Mono findProduct(long id) { return productRepository.findById(id); } } @RestController public class ProductController { private final Logger logger = LoggerFactory.getLogger(ProductController.class); @Autowired private ProductService productService; @GetMapping(value = "/products", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux getProducts() { return productService.findAllProducts(); } @GetMapping("/products/{id}") public Mono getProduct(@PathParam("id") long id) { return productService.findProduct(id); } } @SpringBootApplication @EnableReactiveMongoRepositories public class MyshopApplication { public static void main(String[] args) { SpringApplication.run(MyshopApplication.class, args); } } # Puerto donde escucha el servidor una vez se inicie server.port=8081 # Configuración MongoDB spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=myshop-webflux @Data @NoArgsConstructor @ToString public class Product { private String id; private String name; private String description; private String category; private float price; private LocalDateTime creationDate; } WebClient webClient = WebClient.create("http://localhost:8081"); Flux productsFlux = webClient.get() .uri("/products") .retrieve() .bodyToFlux(Product.class); productsFlux.doOnError((System.out::println)) .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())) .doOnComplete(() -> System.out.println("Terminado")) .subscribe((product) -> { // Simulamos una operación costosa en tiempo try { System.out.println("Haciendo algo con " + product.getName() + " . . ."); Thread.sleep(5000); } catch (InterruptedException ie) { ie.printStackTrace(); } System.out.println("Consumidor 1: " + product.getName()); }); productsFlux.doOnError((System.out::println)) .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())) .doOnComplete(() -> System.out.println("Terminado")) .subscribe((product) -> System.out.println("Consumidor 2: " + product.getName())); System.out.println(productsFlux.count().block()); ---- ===== Ejercicios ===== {{ ejercicio.png?75}} === Patrón Observer === - Realiza una aplicación que simule la persecución de dos aviones. Considerando los parámetros altura, velocidad y dirección, el avión perseguido puede trazar la ruta que quiera y cambiar su velocidad y altura y el avión perseguidor deberá modificar esos parámetros en la misma medida en que lo haga el primero. Puedes solicitar por consola cambiar los parámetros del avión perseguido o bien preparar un recorrido ya definido y lanzar la aplicación para ver como lo recorren ambos objetos (mostrando en cada instante los parámetros de cada uno)\\ \\ - Crea una aplicación en la que un ListView muestre el contenido de la lista, de forma que el ListView se actualice cada vez que se añadan elementos a la lista === Streams === - Define una colección de ciudades (almacenando nombre, provincia, habitantes y extensión para cada una de ellas) y realiza las siguientes operaciones utilizando Streams: - ¿De cuántas provincias diferentes son las ciudades? - ¿Cuántas ciudades hay? - Calcula el número total de habitantes para una provincia determinada (introducida por el usuario, por ejemplo) - Obtén una colección con los nombres de todas las ciudades - Obtén una colección con los nombres de todas las provincias (sin repetir) - ¿Todas las ciudades son de más de 50.000 habitantes? - ¿Alguna ciudad de una provincia determinada (introducida por el usuario) tiene más de 10.000 habitantes? === Future/CompletableFuture === - Realiza una aplicación que descargue un fichero de Internet y, una vez esté descargado, lo abra para que el usuario pueda ver su contenido (podéis hacerlo simplemente con un fichero de texto o algo legible como tal) - Realiza una aplicación que zipee el contenido de un directorio y, cuando termine, lo copie a otra carpeta indicada por el usuario (podéis ver [[https://www.baeldung.com/java-compress-and-uncompress|aqui]] como trabajar con ficheros zip en Java) === RxJava === - Realiza una aplicación JavaFX que muestre al usuario el listado de paises usando la API [[https://restcountries.eu|REST Countries]] - Amplia la aplicación anterior para ver también información detallada sobre cada país - Añade, a la aplicación anterior, opciones de filtrado sin utilizar las que proporciona la propia API - Utiliza la [[https://world.openfoodfacts.org/data|API Open Food Facts]] para realizar una aplicación JavaFX que muestre la información de un producto a partir de su código de barras (Desde la web disponen de un [[https://world.openfoodfacts.org/cgi/search.pl|buscador de productos]] para ver ejemplos de datos que almacenan) ---- ===== Proyectos de ejemplo ===== Todos los proyectos de ejemplo de esta parte están en el [[http://www.github.com/codeandcoke/java-reactiva|repositorio java-reactiva]] de GitHub. Los proyectos que se vayan haciendo en clase estarán disponibles en el [[http://www.github.com/codeandcoke/datos-ejercicios|repositorio datos-ejercicios]], también en GitHub. Para manejaros con Git recordad que tenéis una serie de videotutoriales en [[https://entornos-desarrollo.codeandcoke.com/apuntes:git|La Wiki de Entornos de Desarrollo]] ---- (c) 2021 Santiago Faci