Avançando TestScheduler de RxJava para uma chamada de bloqueio para atrasar ()

votos
0

Usando RxJava2 Eu tenho uma classe que preenche a reativo para o mundo não-reativo. Nesta classe, existe um método substituído, que retorna algum valor após uma potencialmente longa de processamento . Este processamento é incorporado para o mundo reactivo usando Single.create(emitter -> ...).

O tempo de processamento é crucial para mim, como há uma diferença em relação à lógica de negócios, se um segundo assinante aparece enquanto o processamento está em execução ou após ter sido concluída (ou canceladas).

Agora estou em testar esta classe - portanto, eu preciso do longo tempo de processamento para ser apenas virtual . RxJava de TestSchedulerpara o resgate!

Para criar um teste de implementação para que bridging da classe base, que emula o atraso, eu preciso de um bloqueio (!) Convidar o TestScheduler, onde o avanço no tempo é acionado em um segmento separado. (Caso contrário, o avanço no tempo nunca seria acionado.)

Meu problema é que eu preciso do avanço a tempo de ser adiada pelo menos até a 'computação caro' está bloqueando - caso contrário, o tempo de trocar de roupa antes de o atraso da operadora fica ativo e, portanto, ele espera até que outro avanço que nunca vai acontecer.

Eu posso resolver isso chamando Thread.sleep(100)antes da chamada para avançar no tempo - mas que parece um pouco feio para mim ... Quanto tempo para esperar? Para vários testes, que o tempo acrescenta-se, mas não por causa de problemas de tempo ... ugh.

Alguma idéia de como testar esta situação de uma forma esperamos mais limpa? Obrigado!

import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;

import java.util.concurrent.TimeUnit;

public class TestSchedulerBlocking {

    public static void main(String[] args) throws Exception {

        TestScheduler testScheduler = new TestScheduler();

        // the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
        Single<String> single = Single.create(emitter -> {

            System.out.println(emitting on  + Thread.currentThread() +  ...);

            // This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
            // (e.g. method returns, or, in another case, an exception is thrown).
            // Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
            // Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
            // Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
            // I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
            // Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
            Completable.complete()
                       .delay(3, TimeUnit.SECONDS, testScheduler)
                       .doOnSubscribe(__ -> System.out.println(onSubcribe: completable))
                       .blockingAwait();

            System.out.println(<< blocking released >>);

            // this has to be delayed! Might be, for example, a return or an exception in real world.
            emitter.onSuccess(FooBar!);

        });


        System.out.println(running the task ...);
        TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println(onSubcribe: single))
                                                  .subscribeOn(Schedulers.newThread())
                                                  .test();

        // Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
        // and therefore the blockingAwait() blocks until infinity...
        System.out.println(---SLEEPING---);
        Thread.sleep(100);

        // virtually advance the blocking delay 
        System.out.println(---advancing time---);
        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

        // wait for the task to complete
        System.out.println(await...);
        testObserver.await();

        // assertions on task results, etc.
        System.out.println(finished. now do some testing... values (expected [FooBar!]):  + testObserver.values());

    }

}

[Nota: Eu fiz esta pergunta de uma maneira mais complexa e detalhada sobre ontem - mas como era muito quente no meu escritório e tinha várias falhas e erros. Por isso eu deletei e agora está mais condensado para o problema real.]

Publicado 19/09/2018 em 13:31
fonte usuário
Em outras línguas...                            


1 respostas

votos
0

Se eu entender o seu problema corretamente, você tem um serviço de longa duração e você quer ser capaz de testar as assinaturas de serviços que serão afetados por seu serviço de longa duração. Tenho abordado este da seguinte maneira.

Defina o seu serviço como você tem, usando um TestSchedulerpara controlar o seu avanço no tempo. Em seguida, defina seus estímulos de teste usando o mesmo programador de teste.

Este emula a entidade bloqueio. Não use "bloco".

Single<String> single = Observable.timer( 3, TimeUnit.SECONDS, scheduler )
                          .doOnNext( _l -> {} )
                          .map( _l -> "FooBar!" )
                          .take( 1 )
                          .toSingle();

// set up asynchronous operations
Observable.timer( 1_500, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction1() );
Observable.timer( 1_900, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction2() );
Observable.timer( 3_100, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction3() );
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
                                            .subscribeOn(Schedulers.newThread())
                                            .test();

// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);

Você deve ver que performTestAction1()ocorre, em seguida, performTestAction2()em seguida, os singleacabamentos, então performTestAction3(). Tudo isso é impulsionado pelo programador teste.

Respondeu 20/09/2018 em 15:00
fonte usuário

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more