sql >> Base de Datos >  >> RDS >> PostgreSQL

@Tailable(spring-data-reactive-mongodb) equivalente en spring-data-r2dbc

Estaba en el mismo problema, no estoy seguro de si encontró una solución o no, pero pude lograr algo similar haciendo lo siguiente. Primero, agregué un activador a mi tabla

CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

Esto establecerá un activador en la tabla cada vez que se actualice, elimine o inserte una fila. Luego llamará a la función de activación que configuré, que se parecía a esto:

CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

Esto me permitirá 'escuchar' cualquiera de estas actualizaciones de mi proyecto de arranque de primavera y enviará la fila completa como una carga útil. A continuación, en mi proyecto de arranque de primavera configuré una conexión a mi base de datos.

@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    }
}

Con eso, lo conecto automáticamente (inyección de dependencia) en el constructor en mi clase de servicio y lo envío a una clase r2dbc PostgressqlConnection así:

this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

Ahora queremos 'escuchar' nuestra tabla y recibir una notificación cuando realicemos alguna actualización en nuestra tabla. Para hacer eso, configuramos un método de inicialización que se realiza después de la inyección de dependencia utilizando la anotación @PostContruct

@PostConstruct
private void postConstruct() {
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}

Note que escuchamos cualquier nombre que pongamos dentro del método pg_notify. También queremos configurar un método para cerrar la conexión cuando el bean esté a punto de ser tirado, así:

@PreDestroy
private void preDestroy() {
    postgresqlConnection.close().subscribe();
}

Ahora simplemente creo un método que devuelve un Flux de lo que sea que esté actualmente en mi tabla, y también lo fusiono con mis notificaciones, como dije antes, las notificaciones vienen como json, así que tuve que deserializarlo y decidí usar Mapeador de objetos. Entonces, se verá algo como esto:

private Flux<YourClass> getUpdatedRows() {
    return postgresqlConnection.getNotifications().map(notification -> {
        try {
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
        } catch (IOException e) {
            //handle exception
        }
    });
}

public Flux<YourClass> getDocuments() {
    return documentRepository.findAll().share().concatWith(getUpdatedRows());
}

Espero que esto ayude. ¡Saludos!