Table of Contents

Programación reactiva

La programación reactiva es un paradigma de programación por el que se ejecutan una serie de eventos sobre un conjunto de datos siguiendo el patrón de diseño Observer. De esa manera, son los propios objetos quienes notifican de los cambios producidos sobre si mismos para que se ejecuten, de forma asíncrona, los eventos asociados a éstos.

Ya en el propio JDK podemos encontrar una API para programación reactiva (clase Flow) que es el reemplazo natural para las clases Observer y Observable que antes nos permitían implementar el patrón de diseño Observer (pero que ya han sido desaconsejadas).

Estos flujos de datos sobre los que se trabajan, ahora conocidos como Flows, podríamos compararlos con los Stream que aparecen en Java 8. Pero los primeros presentan algunas diferencias con los segundos que los hacen especialmente apropiados para su uso en programación reactiva. Con los Stream podemos procesar flujos de datos (incluso de forma paralela) y con un Flow podemos hacerlo de forma asíncrona. Es por eso que muchas veces nos referiremos a Flow como una API de Streams Reactivos.

Aunque Java dispone de una API para la programación reactiva, nos centraremos en la utilización de la librería RxJava ya que su uso está bastante extendido actualmente.


Introducción a la programación reactiva

Patrón Observer

El patrón Observer es un patrón de diseño de software por el cual un objeto, llamada sujeto es observado por otros objetos llamados observadores a los cuales notifica cualquier cambio que sobre él se produce, normalmente invocando a alguno de sus métodos. El API de Java incluye las clases necesarias para implementar este patrón, en el paquete java.beans, con la clase PropertyChangeSupport y la interface PropertyChangeListener para definir al objeto observado y a los objetos observadores, respectivamente.

En el siguiente ejemplo se puede ver la implementación del patrón Observer. Tenemos dos clases, Product y Customer. La primera es el sujeto y la segunda es el observador de forma que cada vez que el producto baje de precio (método decreasePrice(float)) el cliente será notificado (mediante la llamada a firePropertyChange() que invoca el método propertyChange() en el cliente).

/**
 * Clase Product
 * Es la clase Observable. Los cambios que se produzcan serán
 * observados inmediatamente por las clases Observer que se registren
 * desde ésta
 */
public class Product {
 
    private String name;
    private String description;
    private float price;
    private int stock;
 
    private PropertyChangeSupport support;
 
    public Product(String name, String description, float price) {
        this.name = name;
        this.description = description;
        this.price = price;
        stock = 0;
        support = new PropertyChangeSupport(this);
    }
 
    public void addObserver(PropertyChangeListener observer) {
        support.addPropertyChangeListener(observer);
    }
 
    public void removeObserver(PropertyChangeListener observer) {
        support.removePropertyChangeListener(observer);
    }
 
    public void decreasePrice(float value) {
        support.firePropertyChange("price", price, (price - value));
        price -= value;
    }
}
/**
 * Clase Customer
 * En este caso es la clase que observa (Observer)
 * Observa los cambios que se produzcan en otra clase, a la que se conoce como 
 * Observable
 */
public class Customer implements PropertyChangeListener {
 
    @Override
    public void propertyChange(PropertyChangeEvent event) {
        float newPrice = (float) event.getNewValue();
        float oldPrice = (float) event.getOldValue();
        System.out.println("El precio ha bajado " + (oldPrice - newPrice) + " €");
    }
}
public class Application {
    public static void main(String[] args) {
        Product product = new Product("Donuts", "Donuts de chocolate", 10);
        Customer customer = new Customer();
        product.addObserver(customer);
 
        product.decreasePrice(2);
    }
}

En el siguiente video se puede ver la implementación de este patrón usando la clase Observable y la interface Observer de Java. Hay que tener en cuenta que ambas fueron desaconsejadas a partir de Java 9 por lo que es un ejemplo que habría que comenzar a olvidar. Java recomienda, entre otras, la implementación que acabamos de ver en el ejemplo anterior.


ObservableList de JavaFX

Otro ejemplo de patrón Observer lo tenemos en la clase ObservableList de la librería JavaFX. Es una clase que funciona como colección de objetos y que además soporta Listeners que puede hacer de observadores ante cualquier cambio que ocurra en la lista (añadir, modificar o eliminar un elemento).

Para ver cómo funciona necesitamos añadir JavaFX como dependencia en nuestro pom.xml. También incluiremos lombok para simplificar un poco el desarrollo de las clases del modelo de datos.

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.18</version>
</dependency>
<dependency>
    <groupId>org.openjfx</groupId>
    <artifactId>javafx-controls</artifactId>
    <version>14</version>
</dependency>

Vamos a definir dos clases. Una formará parte de la colección observable (Movie) y la otra (Cinema) será quien observará los cambios sobre la colección.

@Data
@Builder
@ToString
@EqualsAndHashCode
public class Movie {
 
    private String title;
    @EqualsAndHashCode.Exclude
    private String summary;
    @EqualsAndHashCode.Exclude
    private int duration;
}

La clase Cinema será la observadora y por eso implementa la interfaces ListChangeListener

@Data
@Builder
@ToString
@EqualsAndHashCode
public class Cinema implements ListChangeListener<Movie> {
 
    private String name;
    @EqualsAndHashCode.Exclude
    private String address;
 
    @Override
    public void onChanged(Change<? extends Movie> change) {
        while (change.next()) {
            if (change.wasRemoved()) {
                System.out.println("Se ha eliminado una pelicula: " + change.getRemoved().get(0));
            } else if (change.wasAdded()) {
                System.out.println("Se estrena una nueva pelicula: " + change.getAddedSubList().get(0));
            }
        }
    }
}

En la clase principal se ve cómo creamos la colección ObservableList, añadimos al observador con el método addLister y realizamos algunos cambios. Al ejecutarlo, la clase Cinema observará dichos cambios ejecutándose el bloque correspondiente al cambio producido en el método onChange de esa clase.

public class Application {
    public static void main(String[] args) {
        Movie movie = Movie.builder()
                .title("Rocky")
                .summary("Pelicula de boxeo")
                .duration(120)
                .build();
        Cinema cinema = Cinema.builder()
                .name("Cinema1")
                .build();
        ObservableList<Movie> movies = FXCollections.observableArrayList(movie);
        movies.addListener(cinema);
 
        movies.remove(0);
        movies.add(Movie.builder().title("Terminator 2").build());
    }
}

La salida por pantalla que muestra este ejemplo será algo parecido a la siguiente:

. . .
Se ha eliminado una pelicula: Movie(title=Rocky, summary=Pelicula de boxeo, duration=120)
Se estrena una nueva pelicula: Movie(title=Terminator 2, summary=null, duration=0)

Streams

Los Streams son clases que permiten encapsular colecciones o arrays de forma que podamos ejecutar una serie de operaciones sobre ellos utilizando expresiones lambda.

Simplifican la operación con los elementos puesto que podemos encandenar varias lambdas y recuperar el resultado final en otra colección distinta de la original.

Se puede crear de diferentes formas:

List<Animal> animals = new ArrayList<>();
Stream<Animal> streamAnimals = animales.stream();
Stream<String> names = Stream.of("Diana", "Maria", "Trinidad");
String[] names = {"Diana", "Maria", "Trinidad"};
Stream<String> streamNames = Stream.of(nombres);

Existen 2 tipos de operaciones sobre los Streams:

Operaciones intermedias

Son las operaciones que siempre acaban devolviendo el stream modificado

Operaciones finales

Son las operaciones que devuelven como resultado algo diferente que el propio Stream

Operaciones asíncronas en Java: Future y CompletableFuture

La primera aproximación a la posibilidad de ejecución de tareas asíncronas en Java fue la clase Future, que aparece con Java 5. Permite la ejecución de una tarea de forma asíncrona (aunque luego la operación que permite recoger el resultado de esa tarea es bloqueante).

También existe, desde Java 8, CompletableFuture que permite la ejecución de tareas asíncronas de una forma más completa.

Future

Future es una clase que permite crear objetos que pueden ser asignados a un hilo para ejecutarse en segundo plano. Además, estas tareas pueden devolver un resultado al terminar y lanzar excepciones si es oportuno.

La ejecución de la tarea asignada al Future se hace de forma asíncrona, de forma que el programa continúa mientras ésta se ejecuta hasta que recuperemos el valor que devuelve invocando al método get()

Por un lado se define la tarea:

public class TareaCallable implements Callable<Boolean> {
    @Override
    public Boolean call() throws Exception {
        // Hace algo 
    } 
}

Y por otro lado el Future que la lanzará y permitirá recoger el resultado:

ExecutorService executor = Executors.newFixedThreadPool(2);
...
Future<Boolean> respuesta = executor.submit(new TareaCallable()); 
// El programa sigue funcionando
// y la respuesta de la tarea estará en 'respuesta' cuando ésta esté lista
Boolean resultado = respuesta.get();
// Podemos forzar la espera (y bloqueo) de la respuesta
// El método get() hace que el hilo espere a que termine la tarea 
executor.shutdown();

También es posible cancelar la tarea una vez lanzada:

ExecutorService executor = Executors.newFixedThreadPool(2);
...
Future<Boolean> respuesta = executor.submit(new TareaCallable()); 
while (futuro.isDone()) {
    // Podemos ir haciendo algo mientras no termina la tarea
    if (tiempoPasado > XXXX)
    // Podemos cancelar la tarea pasado un tiempo respuesta.cancel(true);
}
if (respuesta.isCancelled()) {
    // La tarea ha terminado porque ha sido cancelada, podemos hacer algo al respecto
} else {
    // La tarea ha terminado correctamente, podemos recoger su resultado y hacer algo
    Boolean resultado = respuesta.get(); 
    ...
} 
executor.shutdown();

CompletableFuture

Proporciona una API más completa que la que hay para la clase Future.

CompletableFuture.runAsync(() -> doSomething())
    .whenComplete((string, throwable) -> doSomethingWhenFinishFuture());
private String doSomethingAndReturnResult() {
    return "ok";
}
 
CompletableFuture.supplyAsync(() -> doSomethingAndReturnResult())
                .thenAccept(System.out::println)
                .whenComplete((nothing, error) -> System.out.println("Fin"));

RxJava

RxJava es una librería para programación asíncrona de eventos y secuencias observables de elementos.

Tomando como punto de partida el patrón Observer que hemos visto anteriormente, permite gestionar secuencias de eventos y operaciones sobre éstos de forma que el programador pueda abstraerse de temas como la gestión de los hilos a bajo nivel, la sincronización de observador y observador y algunas otras consideraciones.

Intro To RxJava es una guía muy interesante sobre las características que ofrece RxJava con ejemplos muy sencillos. Es algo antigua pero todavía sirve.

Introducción a RxJava

Hay que tener en cuenta que la librería de programación reactiva RxJava se fundamenta en dos tipos de datos principalmente:

Cuando un Observer comienza a consumir los items emitidos por un Observable se dice que se ha suscrito a él. Asi, las acciones que implementemos asociadas a la suscripción van orientadas a consumir los elementos que el Observable emite:

Puesto que utilizaremos la versión 3 de RxJava, tendremos que incluir la dependencia correspondiente en nuestro fichero pom.xml

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.0.10</version>
</dependency>

Ejemplo de cómo crear una secuencia de valores y consumirla como colección Observable. En el momento de suscribirnos al observable, se implementa qué hacer para cada una de las acciones asociadas: doOnNext, doOnError y doOnComplete. En este caso nos limitamos a mostrar los elementos uno a uno:

// Una secuencia de valores
Observable<String> values = Observable.just("Una", "Dos", "Tres");
values.subscribe(
    value -> System.out.println("Value: " + value),
    error -> System.out.println("Error: " + error.getMessage()),
    () -> System.out.println("The end!")
);

Mismo ejemplo que el anterior pero esta vez invocamos a los métodos de forma más clara:

Observable.just("Una", "Dos", "Tres")
         .doOnComplete(() -> System.out.println("The end!"))
         .doOnError(error -> System.out.println("Error: " + error.getMessage()))
         .subscribe(value -> System.out.println("Value: " + value));

En este caso tenemos dos Observer suscritos al mismo Observable. Cada uno para lo que considere con los items que consuma:

// Dos suscriptores se suscriben a la misma secuencia
Observable<String> values = Observable.just("Una", "Dos", "Tres")
         .doOnComplete(() -> System.out.println("The end!"))
         .doOnError(error -> System.out.println("Error: " + error.getMessage()));
values.subscribe(value -> System.out.println("Value (observador 1): " + value));
values.subscribe(value -> System.out.println("Value (observador 2): " + value));

En el siguiente ejemplo se puede ver como los items de un Observable se crean, por defecto, en el momento en que éste es creado. Se puede comprobar que ambos Observer consumen el mismo elemento:

Observable<Long> moreValues = Observable.just(System.currentTimeMillis());
moreValues.subscribe(System.out::println);
try {
    Thread.sleep(1000);
} catch (InterruptedException ie) {
    ie.printStackTrace();
}
moreValues.subscribe(System.out::println);

Por el contrario, en el siguiente ejemplo, haciendo uso del método defer() hacemos que la secuencia de elementos se genere en el momento en el que cada Observer comienza a consumirlos:

Observable<Long> moreMoreValues = Observable.defer(() -> Observable.just(System.currentTimeMillis()));
moreMoreValues.subscribe(System.out::println);
try {
    Thread.sleep(1000);
} catch (InterruptedException ie) {
    ie.printStackTrace();
}
moreMoreValues.subscribe(System.out::println);

En este ejemplo podemos ver cómo crear un Observable a partir de una colección (y, por tanto, también de un array):

List<Book> books = new ArrayList<>();
books.add(new Book("Secuestrado", "Libro de aventuras", 500, "Aventuras", LocalDate.now()));
books.add(new Book("Tom Sawyer", "Libro", 450, "Aventuras", LocalDate.now()));
Observable<Book> bookObservable = Observable.fromArray(books.toArray(new Book[0]));
bookObservable.subscribe(System.out::println);

Tal y como hacemos con los Streams, vemos con el siguiente ejemplo que podemos establecer filtros para suscribirnos solamente a aquellos elementos que nos interesen:

bookObservable.filter(book -> book.getPageCount() > 450)
     .subscribe(System.out::println);

Y vemos en el siguiente como también podemos realizar otro tipo de operaciones propias de los Streams:

bookObservable.map(Book::getTitle)
     .subscribe(System.out::println);

Y también podemos concatenar los secuencias de elementos (dos Observable) para suscribirnos al resultado final:

Observable<Integer> range1 = Observable.range(1, 10);
Observable<Integer> range2 = Observable.range(11, 10);
Observable.concat(range1, range2)
     .subscribe(System.out::println);

En este último ejemplo, nos suscribimos utilizando otro hilo y, por tanto, el consumo de los elementos del Observable se realizará en segundo plano (RxJava no es concurrente por defecto):

bookObservable.map(Book::getTitle)
     .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
     .subscribe(System.out::println);
System.out.println("¿Esto cuándo se ejecuta?");

Programación reactiva con RxJava

Una vez que hemos visto una introducción a la programación reactiva con RxJava, vamos a ver un ejemplo real en el que invocamos a una API remota para solicitar una serie de datos y cómo podemos consumir éstos de forma reactiva utilizando esta librería. Aprovecharemos también y veremos como funciona Retrofit que es una librería que permite mapear directamente el JSON de respuesta de la API a nuestro modelo de clases Java.

Así, solicitaremos la lista de ciudades a la API REST Countries, ésta nos devolverá una lista de ciudades (ya mapeada como clases Java gracias a Retrofit) y nosotros procesaremos dicha lista como una secuencia con RxJava. De esa manera, podremos procesar ciudad a ciudad haciendo uso de programación reactiva.

Proyecto de ejemplo: Aplicación que consume una API con RetroFit. Sin RxJava

El objetivo de este primer ejemplo es conocer cómo funciona la librería sin hacer uso de la programación reactiva.

Incluimos las dependencias en el pom.xml de nuestro proyecto:

. . .
<dependencies>
    <dependency>
        <groupId>com.squareup.retrofit2</groupId>
        <artifactId>retrofit</artifactId>
        <!-- Para evitar el warning que da la 2.9.0 se puede usar la 2.7.0 -->
        <version>2.7.0</version>
    </dependency>
    <dependency>
        <groupId>com.squareup.retrofit2</groupId>
        <artifactId>converter-gson</artifactId>
        <version>2.3.0</version>
    </dependency>
</dependencies>
. . .

Definimos una clase con aquellos atributos que queremos mapear de entre todos los que la API devuelve. Hay que tener en cuenta que solamente se mapearán aquellos campos que coincidan en nombre con los atributos y que se hayan definido en la clase. El resto se omitirán:

/**
 * Modelo de objeto que devuelve la API (https://restcountries.eu/)
 * @author Santiago Faci
 * @version curso 2020-2021
 */
public class Country {
 
    private String name;
    private String capital;
    private String region;
    private String subregion;
 
    @Override
    public String toString() {
        return name + " [" + capital +  "]";
    }
}

Y generamos la interfaz con aquellas operaciones de la API en las que estamos interesados. En este caso nos interesa el endpoint /rest/v2/all que devuelve la lista completa de paises y el endpoint /rest/v2/name/{name} que devuelve información detallada sobre un determinado país:

/**
 * Interfaz de la API (https://restcountries.eu/)
 * @author Santiago Faci
 * @version curso 2020-2021
 */
public interface CountriesApiService {
 
    @GET("/rest/v2/all")
    Call<List<Country>> getAllCountries();
 
    // Cuidado! Aunque la respuesta en un solo país viene en forma de array
    @GET("/rest/v2/name/{name}")
    Call<List<Country>> getCountry(@Path("name") String name);
}

En la clase CountriesService inicializamos la librería Retrofit en el constructor e implementamos las operaciones que hemos definido en la interface CountriesApiService:

/**
 * Service con las operaciones disponibles
 * @author Santiago Faci
 * @version curso 2020-2021
 */
public class CountriesService {
 
    private CountriesApiService api;
 
    public CountriesService() {
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(URL)
                .addConverterFactory(GsonConverterFactory.create())
                .build();
 
        api = retrofit.create(CountriesApiService.class);
    }
 
    public List<Country> getAllCountries() {
        Call<List<Country>> allCountriesCall = api.getAllCountries();
        try {
            Response<List<Country>> response = allCountriesCall.execute();
            return response.body();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
 
        return null;
    }
 
    // TODO Implementar la operación para obtener la información de un país
}

En la clase Constants definiremos las constantes de aplicación que necesitemos. Por el momento nos basta con la URL base de la API.

/**
 * Constantes
 * @author Santiago Faci
 * @version curso 2020-2021
 */
public class Constants {
 
    public final static String URL = "https://restcountries.eu/";
}

Y por último, a modo de ejemplo, invocamos a la operación implementada para observar cómo nos devuelve la información mapeada en una colección Java:

/**
 * Clase principal
 * @author Santiago Faci
 * @version curso 2020-2021
 */
public class Application {
    public static void main(String[] args) {
        CountriesService countriesService = new CountriesService();
        List<Country> countries = countriesService.getAllCountries();
 
        System.out.println(countries.toString());
    }
}

Proyecto de ejemplo: Aplicación que consume una API con Retrofit. Con RxJava

En este caso, se trata de desarrollar el mismo ejemplo que en el caso anterior pero utilizando programación reactiva con RxJava. Para ello, crearemos un nuevo proyecto:

Para comenzar, incluiremos las librerías RxJava y Retrofit en nuestro fichero pom.xml.

. . .
<dependencies>
    <dependency>
        <groupId>io.reactivex.rxjava3</groupId>
        <artifactId>rxjava</artifactId>
        <version>3.0.10</version>
    </dependency>
    <dependency>
        <groupId>com.squareup.retrofit2</groupId>
        <artifactId>retrofit</artifactId>
        <version>2.7.0</version>
    </dependency>
    <dependency>
        <groupId>com.squareup.retrofit2</groupId>
        <artifactId>converter-gson</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>com.squareup.retrofit2</groupId>
        <artifactId>adapter-rxjava</artifactId>
        <version>2.3.0</version>
    </dependency>
</dependencies>
. . .

Definiremos de forma similar la clase Country que será nuestro modelo de datos Java:

public class Country {
   . . .
}

Las operaciones que definamos en el interface CountriesApiService ahora ya “saben” devolver Observable en lugar de una colección Java tal cual:

/**
 * Interfaz de la API para emplear con RetroFit (https://restcountries.eu)
 * @author Santiago Faci
 * @version Curso 2020-2021
 */
public interface CountriesApiService {
 
    @GET("/rest/v2/all")
    Observable<List<Country>> getAllCountries();
 
    // Cuidado! Aunque la respuesta en un solo país viene en forma de array
    @GET("/rest/v2/name/{name}")
    Observable<List<Country>> getCountry(@Path("name") String name);
}

De forma muy similar al primer caso, implementamos ambas operaciones devolviendo como resultado, en ambos casos, un Observable. En el caso en el que obtenemos la información de un solo país, ésta viene en forma de array (de un solo elemento) por lo que también tenemos que recogerlo como tal:

/**
 * Service para consulta de la API
 * @author Santiago Faci
 * @version Curso 2020-2021
 */
public class CountriesService {
 
    private CountriesApiService api;
 
    public CountriesService() {
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(URL)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .build();
        api = retrofit.create(CountriesApiService.class);
    }
 
    public Observable<List<Country>> getAllCountries() {
        return api.getAllCountries();
    }
 
    public Observable<List<Country>> getCountry(String name) {
        return api.getCountry(name);
    }
}

Y, finalmente, cuando la ponemos en marcha, suscribimos un Observer al Observable que devuelven ambas operaciones de forma que podemos ir procesando elemento a elemento directamente. Se puede ver especialmente claro en el caso de la operación que devuelve la lista completa de países:

public class Constants {
   . . .
}
/**
 * Clase principal
 * @author Santiago Faci
 * @version curso 2020-2021
 */
public class Application {
    public static void main(String[] args) {
        final CountriesService countriesService = new CountriesService();
 
        System.out.println("Comenzando descarga . . .");
 
        countriesService.getAllCountries()
                .flatMap(Observable::from)
                .doOnCompleted(() -> System.out.println("Listado de países descargado"))
                .doOnError(throwable -> System.out.println(throwable.getMessage()))
                .subscribeOn(Schedulers.from(Executors.newCachedThreadPool()))
                .subscribe(System.out::println);
 
        countriesService.getCountry("spain")
                .flatMap(Observable::from)
                .doOnCompleted(() -> System.out.println("Cargada información de spain"))
                .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
                .subscribe(System.out::println);
 
        System.out.println("Fin?");
    }
}

WebFlux

Introducción a WebFlux

Programación reactiva con WebFlux

@Data
@NoArgsConstructor
@Document(value = "products")
public class Product {
 
    @Id
    private String id;
    @Field
    private String name;
    @Field
    private String description;
    @Field
    private String category;
    @Field
    private float price;
    @Field(name = "creation_date")
    private LocalDateTime creationDate;
}
@Repository
public interface ProductRepository extends ReactiveMongoRepository<Product, Long> {
 
    Flux<Product> findAll();
    Mono<Product> findByName(String name);
}
@Service
public class ProductServiceImpl implements ProductService {
 
    @Autowired
    private ProductRepository productRepository;
 
    @Override
    public Flux<Product> findAllProducts() {
        return productRepository.findAll();
    }
 
    @Override
    public Mono<Product> findProductByName(String name) {
        return productRepository.findByName(name);
    }
 
    @Override
    public Mono<Product> findProduct(long id) {
        return productRepository.findById(id);
    }
}
@RestController
public class ProductController {
 
    private final Logger logger = LoggerFactory.getLogger(ProductController.class);
 
    @Autowired
    private ProductService productService;
 
    @GetMapping(value = "/products", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Product> getProducts() {
        return productService.findAllProducts();
    }
 
    @GetMapping("/products/{id}")
    public Mono<Product> getProduct(@PathParam("id") long id) {
        return productService.findProduct(id);
    }
}
@SpringBootApplication
@EnableReactiveMongoRepositories
public class MyshopApplication {
 
	public static void main(String[] args) {
		SpringApplication.run(MyshopApplication.class, args);
	}
}
# Puerto donde escucha el servidor una vez se inicie
server.port=8081
 
# Configuración MongoDB
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=myshop-webflux
@Data
@NoArgsConstructor
@ToString
public class Product {
 
    private String id;
    private String name;
    private String description;
    private String category;
    private float price;
    private LocalDateTime creationDate;
}
WebClient webClient = WebClient.create("http://localhost:8081");
        Flux<Product> productsFlux = webClient.get()
                .uri("/products")
                .retrieve()
                .bodyToFlux(Product.class);
 
        productsFlux.doOnError((System.out::println))
                .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool()))
                .doOnComplete(() -> System.out.println("Terminado"))
                .subscribe((product) -> {
                    // Simulamos una operación costosa en tiempo
                    try {
                        System.out.println("Haciendo algo con " + product.getName() + " . . .");
                        Thread.sleep(5000);
                    } catch (InterruptedException ie) {
                        ie.printStackTrace();
                    }
 
                    System.out.println("Consumidor 1: " + product.getName());
                });
productsFlux.doOnError((System.out::println))
                .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool()))
                .doOnComplete(() -> System.out.println("Terminado"))
                .subscribe((product) -> System.out.println("Consumidor 2: " + product.getName()));
System.out.println(productsFlux.count().block());

Ejercicios

Patrón Observer

  1. Realiza una aplicación que simule la persecución de dos aviones. Considerando los parámetros altura, velocidad y dirección, el avión perseguido puede trazar la ruta que quiera y cambiar su velocidad y altura y el avión perseguidor deberá modificar esos parámetros en la misma medida en que lo haga el primero. Puedes solicitar por consola cambiar los parámetros del avión perseguido o bien preparar un recorrido ya definido y lanzar la aplicación para ver como lo recorren ambos objetos (mostrando en cada instante los parámetros de cada uno)

  2. Crea una aplicación en la que un ListView muestre el contenido de la lista, de forma que el ListView se actualice cada vez que se añadan elementos a la lista

Streams

  1. Define una colección de ciudades (almacenando nombre, provincia, habitantes y extensión para cada una de ellas) y realiza las siguientes operaciones utilizando Streams:
    1. ¿De cuántas provincias diferentes son las ciudades?
    2. ¿Cuántas ciudades hay?
    3. Calcula el número total de habitantes para una provincia determinada (introducida por el usuario, por ejemplo)
    4. Obtén una colección con los nombres de todas las ciudades
    5. Obtén una colección con los nombres de todas las provincias (sin repetir)
    6. ¿Todas las ciudades son de más de 50.000 habitantes?
    7. ¿Alguna ciudad de una provincia determinada (introducida por el usuario) tiene más de 10.000 habitantes?

Future/CompletableFuture

  1. Realiza una aplicación que descargue un fichero de Internet y, una vez esté descargado, lo abra para que el usuario pueda ver su contenido (podéis hacerlo simplemente con un fichero de texto o algo legible como tal)
  2. Realiza una aplicación que zipee el contenido de un directorio y, cuando termine, lo copie a otra carpeta indicada por el usuario (podéis ver aqui como trabajar con ficheros zip en Java)

RxJava

  1. Realiza una aplicación JavaFX que muestre al usuario el listado de paises usando la API REST Countries
  2. Amplia la aplicación anterior para ver también información detallada sobre cada país
  3. Añade, a la aplicación anterior, opciones de filtrado sin utilizar las que proporciona la propia API
  4. Utiliza la API Open Food Facts para realizar una aplicación JavaFX que muestre la información de un producto a partir de su código de barras (Desde la web disponen de un buscador de productos para ver ejemplos de datos que almacenan)

Proyectos de ejemplo

Todos los proyectos de ejemplo de esta parte están en el repositorio java-reactiva de GitHub.

Los proyectos que se vayan haciendo en clase estarán disponibles en el repositorio datos-ejercicios, también en GitHub.

Para manejaros con Git recordad que tenéis una serie de videotutoriales en La Wiki de Entornos de Desarrollo


© 2021 Santiago Faci