diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeDelayErrorTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeDelayErrorTest.java index 7f8d01b471..d95e4474a4 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeDelayErrorTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeDelayErrorTest.java @@ -25,7 +25,6 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.*; -import io.reactivex.rxjava4.functions.LongConsumer; import io.reactivex.rxjava4.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava4.processors.PublishProcessor; import io.reactivex.rxjava4.subscribers.*; @@ -224,18 +223,13 @@ public void mergeFlowableOfFlowables() { final Flowable f1 = Flowable.unsafeCreate(new TestSynchronousFlowable()); final Flowable f2 = Flowable.unsafeCreate(new TestSynchronousFlowable()); - Flowable> flowableOfFlowables = Flowable.unsafeCreate(new Publisher>() { - - @Override - public void subscribe(Subscriber> subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - // simulate what would happen in a Flowable - subscriber.onNext(f1); - subscriber.onNext(f2); - subscriber.onComplete(); - } - - }); + Flowable> flowableOfFlowables = Flowable.unsafeCreate(subscriber -> { + subscriber.onSubscribe(new BooleanSubscription()); + // simulate what would happen in a Flowable + subscriber.onNext(f1); + subscriber.onNext(f2); + subscriber.onComplete(); + }); Flowable m = Flowable.mergeDelayError(flowableOfFlowables); m.subscribe(stringSubscriber); @@ -298,7 +292,7 @@ public void synchronousError() { final Flowable> f1 = Flowable.error(new RuntimeException("unit test")); final CountDownLatch latch = new CountDownLatch(1); - Flowable.mergeDelayError(f1).subscribe(new DefaultSubscriber() { + Flowable.mergeDelayError(f1).subscribe(new DefaultSubscriber() /* NFI */ { @Override public void onComplete() { fail("Expected onError path"); @@ -338,15 +332,10 @@ private static class TestASynchronousFlowable implements Publisher { @Override public void subscribe(final Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); - t = new Thread(new Runnable() { - - @Override - public void run() { - subscriber.onNext("hello"); - subscriber.onComplete(); - } - - }); + t = new Thread(() -> { + subscriber.onNext("hello"); + subscriber.onComplete(); + }); t.start(); } } @@ -393,29 +382,24 @@ private static class TestAsyncErrorFlowable implements Publisher { @Override public void subscribe(final Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); - t = new Thread(new Runnable() { - - @Override - public void run() { - for (String s : valuesToReturn) { - if (s == null) { - System.out.println("throwing exception"); - try { - Thread.sleep(100); - } catch (Throwable e) { - - } - subscriber.onError(new NullPointerException()); - return; - } else { - subscriber.onNext(s); - } - } - System.out.println("subscription complete"); - subscriber.onComplete(); - } - - }); + t = new Thread(() -> { + for (String s : valuesToReturn) { + if (s == null) { + System.out.println("throwing exception"); + try { + Thread.sleep(100); + } catch (Throwable e) { + + } + subscriber.onError(new NullPointerException()); + return; + } else { + subscriber.onNext(s); + } + } + System.out.println("subscription complete"); + subscriber.onComplete(); + }); t.start(); } } @@ -459,15 +443,12 @@ public void errorInParentFlowableDelayed() throws Exception { for (int i = 0; i < 50; i++) { final TestASynchronous1sDelayedFlowable f1 = new TestASynchronous1sDelayedFlowable(); final TestASynchronous1sDelayedFlowable f2 = new TestASynchronous1sDelayedFlowable(); - Flowable> parentFlowable = Flowable.unsafeCreate(new Publisher>() { - @Override - public void subscribe(Subscriber> op) { - op.onSubscribe(new BooleanSubscription()); - op.onNext(Flowable.unsafeCreate(f1)); - op.onNext(Flowable.unsafeCreate(f2)); - op.onError(new NullPointerException("throwing exception in parent")); - } - }); + Flowable> parentFlowable = Flowable.unsafeCreate(op -> { + op.onSubscribe(new BooleanSubscription()); + op.onNext(Flowable.unsafeCreate(f1)); + op.onNext(Flowable.unsafeCreate(f2)); + op.onError(new NullPointerException("throwing exception in parent")); + }); stringSubscriber = TestHelper.mockSubscriber(); @@ -490,20 +471,15 @@ private static class TestASynchronous1sDelayedFlowable implements Publisher subscriber) { subscriber.onSubscribe(new BooleanSubscription()); - t = new Thread(new Runnable() { - - @Override - public void run() { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - subscriber.onError(e); - } - subscriber.onNext("hello"); - subscriber.onComplete(); - } - - }); + t = new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + subscriber.onError(e); + } + subscriber.onNext("hello"); + subscriber.onComplete(); + }); t.start(); } } @@ -514,12 +490,7 @@ public void delayErrorMaxConcurrent() { Flowable source = Flowable.mergeDelayError(Flowable.just( Flowable.just(1).hide(), Flowable.error(new TestException())) - .doOnRequest(new LongConsumer() { - @Override - public void accept(long t1) { - requests.add(t1); - } - }), 1); + .doOnRequest(t1 -> requests.add(t1)), 1); TestSubscriberEx ts = new TestSubscriberEx<>(); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeMaxConcurrentTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeMaxConcurrentTest.java index 76dea4c497..103599a15c 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeMaxConcurrentTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeMaxConcurrentTest.java @@ -92,25 +92,20 @@ private static class SubscriptionCheckObservable implements Publisher { @Override public void subscribe(final Subscriber t1) { t1.onSubscribe(new BooleanSubscription()); - new Thread(new Runnable() { - - @Override - public void run() { - if (subscriptionCount.incrementAndGet() > maxConcurrent) { - failed = true; - } - t1.onNext("one"); - t1.onNext("two"); - t1.onNext("three"); - t1.onNext("four"); - t1.onNext("five"); - // We could not decrement subscriptionCount in the unsubscribe method - // as "unsubscribe" is not guaranteed to be called before the next "subscribe". - subscriptionCount.decrementAndGet(); - t1.onComplete(); - } - - }).start(); + new Thread(() -> { + if (subscriptionCount.incrementAndGet() > maxConcurrent) { + failed = true; + } + t1.onNext("one"); + t1.onNext("two"); + t1.onNext("three"); + t1.onNext("four"); + t1.onNext("five"); + // We could not decrement subscriptionCount in the unsubscribe method + // as "unsubscribe" is not guaranteed to be called before the next "subscribe". + subscriptionCount.decrementAndGet(); + t1.onComplete(); + }).start(); } } @@ -261,7 +256,7 @@ public void backpressureHonored() throws Exception { final CountDownLatch cdl = new CountDownLatch(5); - TestSubscriber ts = new TestSubscriber(0L) { + TestSubscriber ts = new TestSubscriber(0L) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java index 4949d05c30..3a699f8d7e 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeTest.java @@ -81,18 +81,13 @@ public void mergeFlowableOfFlowables() { final Flowable f1 = Flowable.unsafeCreate(new TestSynchronousFlowable()); final Flowable f2 = Flowable.unsafeCreate(new TestSynchronousFlowable()); - Flowable> flowableOfFlowables = Flowable.unsafeCreate(new Publisher>() { - - @Override - public void subscribe(Subscriber> subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - // simulate what would happen in a Flowable - subscriber.onNext(f1); - subscriber.onNext(f2); - subscriber.onComplete(); - } - - }); + Flowable> flowableOfFlowables = Flowable.unsafeCreate(subscriber -> { + subscriber.onSubscribe(new BooleanSubscription()); + // simulate what would happen in a Flowable + subscriber.onNext(f1); + subscriber.onNext(f2); + subscriber.onComplete(); + }); Flowable m = Flowable.merge(flowableOfFlowables); m.subscribe(stringSubscriber); @@ -136,59 +131,46 @@ public void unSubscribeFlowableOfFlowables() throws InterruptedException { final AtomicBoolean unsubscribed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); - Flowable> source = Flowable.unsafeCreate(new Publisher>() { - - @Override - public void subscribe(final Subscriber> subscriber) { - // verbose on purpose so I can track the inside of it - final Subscription s = new Subscription() { - - @Override - public void request(long n) { - - } + Flowable> source = Flowable.unsafeCreate(subscriber -> { + // verbose on purpose so I can track the inside of it + final Subscription s = new Subscription() /* NFI */ { - @Override - public void cancel() { - System.out.println("*** unsubscribed"); - unsubscribed.set(true); - } + @Override + public void request(long n) { - }; - subscriber.onSubscribe(s); + } - new Thread(new Runnable() { + @Override + public void cancel() { + System.out.println("*** unsubscribed"); + unsubscribed.set(true); + } - @Override - public void run() { + }; + subscriber.onSubscribe(s); - while (!unsubscribed.get()) { - subscriber.onNext(Flowable.just(1L, 2L)); - } - System.out.println("Done looping after unsubscribe: " + unsubscribed.get()); - subscriber.onComplete(); + new Thread(() -> { - // mark that the thread is finished - latch.countDown(); - } - }).start(); - } + while (!unsubscribed.get()) { + subscriber.onNext(Flowable.just(1L, 2L)); + } + System.out.println("Done looping after unsubscribe: " + unsubscribed.get()); + subscriber.onComplete(); - }); + // mark that the thread is finished + latch.countDown(); + }).start(); + }); final AtomicInteger count = new AtomicInteger(); - Flowable.merge(source).take(6).blockingForEach(new Consumer() { - - @Override - public void accept(Long v) { - System.out.println("Value: " + v); - int c = count.incrementAndGet(); - if (c > 6) { - fail("Should be only 6"); - } + Flowable.merge(source).take(6).blockingForEach(v -> { + System.out.println("Value: " + v); + int c = count.incrementAndGet(); + if (c > 6) { + fail("Should be only 6"); + } - } - }); + }); latch.await(1000, TimeUnit.MILLISECONDS); @@ -237,7 +219,7 @@ public void synchronizationOfMultipleSequences() throws Throwable { final AtomicReference error = new AtomicReference<>(); Flowable m = Flowable.merge(Flowable.unsafeCreate(f1), Flowable.unsafeCreate(f2)); - m.subscribe(new DefaultSubscriber() { + m.subscribe(new DefaultSubscriber() /* NFI */ { @Override public void onComplete() { @@ -374,22 +356,17 @@ private static class TestASynchronousFlowable implements Publisher { @Override public void subscribe(final Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); - t = new Thread(new Runnable() { - - @Override - public void run() { - onNextBeingSent.countDown(); - try { - subscriber.onNext("hello"); - // I can't use a countDownLatch to prove we are actually sending 'onNext' - // since it will block if synchronized and I'll deadlock - subscriber.onComplete(); - } catch (Exception e) { - subscriber.onError(e); - } - } - - }, "TestASynchronousFlowable"); + t = new Thread(() -> { + onNextBeingSent.countDown(); + try { + subscriber.onNext("hello"); + // I can't use a countDownLatch to prove we are actually sending 'onNext' + // since it will block if synchronized and I'll deadlock + subscriber.onComplete(); + } catch (Exception e) { + subscriber.onError(e); + } + }, "TestASynchronousFlowable"); t.start(); } } @@ -494,49 +471,43 @@ public void earlyUnsubscribe() { } private Flowable createFlowableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(final Scheduler scheduler, final AtomicBoolean unsubscribed) { - return Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(final Subscriber child) { - Flowable.interval(1, TimeUnit.SECONDS, scheduler) - .take(5) - .subscribe(new FlowableSubscriber() { - @Override - public void onSubscribe(final Subscription s) { - child.onSubscribe(new Subscription() { - @Override - public void request(long n) { - s.request(n); - } - - @Override - public void cancel() { - unsubscribed.set(true); - s.cancel(); - } - }); - } - - @Override - public void onNext(Long t) { - child.onNext(t); - } - - @Override - public void onError(Throwable t) { - unsubscribed.set(true); - child.onError(t); - } - - @Override - public void onComplete() { - unsubscribed.set(true); - child.onComplete(); - } - - }); - } - }); + return Flowable.unsafeCreate(child -> Flowable.interval(1, TimeUnit.SECONDS, scheduler) + .take(5) + .subscribe(new FlowableSubscriber() /* NFI */ { + @Override + public void onSubscribe(final Subscription s) { + child.onSubscribe(new Subscription() /* NFI */ { + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + unsubscribed.set(true); + s.cancel(); + } + }); + } + + @Override + public void onNext(Long t) { + child.onNext(t); + } + + @Override + public void onError(Throwable t) { + unsubscribed.set(true); + child.onError(t); + } + + @Override + public void onComplete() { + unsubscribed.set(true); + child.onComplete(); + } + + })); } @Test @@ -561,40 +532,31 @@ public void concurrency() { @Test public void concurrencyWithSleeping() { - Flowable f = Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(final Subscriber s) { - Worker inner = Schedulers.newThread().createWorker(); - final AsyncSubscription as = new AsyncSubscription(); - as.setSubscription(new BooleanSubscription()); - as.setResource(inner); - - s.onSubscribe(as); - - inner.schedule(new Runnable() { - - @Override - public void run() { - try { - for (int i = 0; i < 100; i++) { - s.onNext(1); - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } catch (Exception e) { - s.onError(e); - } - as.dispose(); - s.onComplete(); - } - - }); - } - }); + Flowable f = Flowable.unsafeCreate(s -> { + Worker inner = Schedulers.newThread().createWorker(); + final AsyncSubscription as = new AsyncSubscription(); + as.setSubscription(new BooleanSubscription()); + as.setResource(inner); + + s.onSubscribe(as); + + inner.schedule(() -> { + try { + for (int i = 0; i < 100; i++) { + s.onNext(1); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } catch (Exception e) { + s.onError(e); + } + as.dispose(); + s.onComplete(); + }); + }); for (int i = 0; i < 10; i++) { Flowable merge = Flowable.merge(f, f, f); @@ -611,37 +573,28 @@ public void run() { @Test public void concurrencyWithBrokenOnCompleteContract() { - Flowable f = Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(final Subscriber s) { - Worker inner = Schedulers.newThread().createWorker(); - final AsyncSubscription as = new AsyncSubscription(); - as.setSubscription(new BooleanSubscription()); - as.setResource(inner); - - s.onSubscribe(as); - - inner.schedule(new Runnable() { - - @Override - public void run() { - try { - for (int i = 0; i < 10000; i++) { - s.onNext(i); - } - } catch (Exception e) { - s.onError(e); - } - as.dispose(); - s.onComplete(); - s.onComplete(); - s.onComplete(); - } - - }); - } - }); + Flowable f = Flowable.unsafeCreate(s -> { + Worker inner = Schedulers.newThread().createWorker(); + final AsyncSubscription as = new AsyncSubscription(); + as.setSubscription(new BooleanSubscription()); + as.setResource(inner); + + s.onSubscribe(as); + + inner.schedule(() -> { + try { + for (int i = 0; i < 10000; i++) { + s.onNext(i); + } + } catch (Exception e) { + s.onError(e); + } + as.dispose(); + s.onComplete(); + s.onComplete(); + s.onComplete(); + }); + }); for (int i = 0; i < 10; i++) { Flowable merge = Flowable.merge(f.onBackpressureBuffer(), f.onBackpressureBuffer(), f.onBackpressureBuffer()); @@ -664,7 +617,7 @@ public void backpressureUpstream() throws InterruptedException { final AtomicInteger generated2 = new AtomicInteger(); Flowable f2 = createInfiniteFlowable(generated2).subscribeOn(Schedulers.computation()); - TestSubscriberEx testSubscriber = new TestSubscriberEx() { + TestSubscriberEx testSubscriber = new TestSubscriberEx() /* NFI */ { @Override public void onNext(Integer t) { System.err.println("testSubscriber received => " + t + " on thread " + Thread.currentThread()); @@ -701,7 +654,7 @@ public void backpressureUpstream2() throws InterruptedException { final AtomicInteger generated1 = new AtomicInteger(); Flowable f1 = createInfiniteFlowable(generated1).subscribeOn(Schedulers.computation()); - TestSubscriberEx testSubscriber = new TestSubscriberEx() { + TestSubscriberEx testSubscriber = new TestSubscriberEx() /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -738,7 +691,7 @@ public void backpressureDownstreamWithConcurrentStreams() throws InterruptedExce final AtomicInteger generated2 = new AtomicInteger(); Flowable f2 = createInfiniteFlowable(generated2).subscribeOn(Schedulers.computation()); - TestSubscriberEx testSubscriber = new TestSubscriberEx() { + TestSubscriberEx testSubscriber = new TestSubscriberEx() /* NFI */ { @Override public void onNext(Integer t) { if (t < 100) { @@ -772,16 +725,9 @@ public void onNext(Integer t) { public void backpressureBothUpstreamAndDownstreamWithSynchronousScalarFlowables() throws InterruptedException { final AtomicInteger generated1 = new AtomicInteger(); Flowable> f1 = createInfiniteFlowable(generated1) - .map(new Function>() { - - @Override - public Flowable apply(Integer t1) { - return Flowable.just(t1); - } - - }); + .map((Function>) Flowable::just); - TestSubscriberEx testSubscriber = new TestSubscriberEx() { + TestSubscriberEx testSubscriber = new TestSubscriberEx() /* NFI */ { @Override public void onNext(Integer t) { if (t < 100) { @@ -825,16 +771,9 @@ public void onNext(Integer t) { @Test public void backpressureBothUpstreamAndDownstreamWithRegularFlowables() throws InterruptedException { final AtomicInteger generated1 = new AtomicInteger(); - Flowable> f1 = createInfiniteFlowable(generated1).map(new Function>() { + Flowable> f1 = createInfiniteFlowable(generated1).map(_ -> Flowable.just(1, 2, 3)); - @Override - public Flowable apply(Integer t1) { - return Flowable.just(1, 2, 3); - } - - }); - - TestSubscriberEx testSubscriber = new TestSubscriberEx() { + TestSubscriberEx testSubscriber = new TestSubscriberEx() /* NFI */ { int i; @Override @@ -923,14 +862,7 @@ public void merge100AsyncStreamOf1() { private Flowable mergeNAsyncStreamsOfN(final int outerSize, final int innerSize) { Flowable> os = Flowable.range(1, outerSize) - .map(new Function>() { - - @Override - public Flowable apply(Integer i) { - return Flowable.range(1, innerSize).subscribeOn(Schedulers.computation()); - } - - }); + .map(_ -> Flowable.range(1, innerSize).subscribeOn(Schedulers.computation())); return Flowable.merge(os); } @@ -981,39 +913,27 @@ public void merge1000000SyncStreamOf1() { private Flowable mergeNSyncStreamsOfN(final int outerSize, final int innerSize) { Flowable> os = Flowable.range(1, outerSize) - .map(new Function>() { - - @Override - public Flowable apply(Integer i) { - return Flowable.range(1, innerSize); - } - - }); + .map(_ -> Flowable.range(1, innerSize)); return Flowable.merge(os); } private Flowable createInfiniteFlowable(final AtomicInteger generated) { - Flowable flowable = Flowable.fromIterable(new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - - @Override - public void remove() { - } - - @Override - public Integer next() { - return generated.getAndIncrement(); - } - - @Override - public boolean hasNext() { - return true; - } - }; - } - }); + Flowable flowable = Flowable.fromIterable(() -> new Iterator() /* NFI */ { + + @Override + public void remove() { + } + + @Override + public Integer next() { + return generated.getAndIncrement(); + } + + @Override + public boolean hasNext() { + return true; + } + }); return flowable; } @@ -1021,30 +941,18 @@ public boolean hasNext() { public void mergeManyAsyncSingle() { TestSubscriber ts = new TestSubscriber<>(); Flowable> os = Flowable.range(1, 10000) - .map(new Function>() { - - @Override - public Flowable apply(final Integer i) { - return Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(Subscriber s) { - s.onSubscribe(new BooleanSubscription()); - if (i < 500) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - s.onNext(i); - s.onComplete(); - } - - }).subscribeOn(Schedulers.computation()).cache(); - } - - }); + .map(i -> Flowable.unsafeCreate(s -> { + s.onSubscribe(new BooleanSubscription()); + if (i < 500) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + s.onNext(i); + s.onComplete(); + }).subscribeOn(Schedulers.computation()).cache()); Flowable.merge(os).subscribe(ts); ts.awaitDone(10, TimeUnit.SECONDS); ts.assertNoErrors(); @@ -1159,46 +1067,21 @@ public void mergeKeepsRequesting() throws InterruptedException { Flowable.range(1, 2) // produce many integers per second - .flatMap(new Function>() { - @Override - public Flowable apply(final Integer number) { - return Flowable.range(1, Integer.MAX_VALUE) - .doOnRequest(new LongConsumer() { - - @Override - public void accept(long n) { - messages.add(">>>>>>>> A requested[" + number + "]: " + n); - } - - }) - // pause a bit - .doOnNext(pauseForMs(3)) - // buffer on backpressure - .onBackpressureBuffer() - // do in parallel - .subscribeOn(Schedulers.computation()) - .doOnRequest(new LongConsumer() { - - @Override - public void accept(long n) { - messages.add(">>>>>>>> B requested[" + number + "]: " + n); - } - - }); - } - - }) + .flatMap((Function>) number -> Flowable.range(1, Integer.MAX_VALUE) + .doOnRequest(n -> messages.add(">>>>>>>> A requested[" + number + "]: " + n)) + // pause a bit + .doOnNext(pauseForMs(3)) + // buffer on backpressure + .onBackpressureBuffer() + // do in parallel + .subscribeOn(Schedulers.computation()) + .doOnRequest(n -> messages.add(">>>>>>>> B requested[" + number + "]: " + n))) // take a number bigger than 2* Flowable.bufferSize() (used by OperatorMerge) .take(Flowable.bufferSize() * 2 + 1) // log count .doOnNext(printCount()) // release latch - .doOnComplete(new Action() { - @Override - public void run() { - latch.countDown(); - } - }).subscribe(); + .doOnComplete(() -> latch.countDown()).subscribe(); boolean a = latch.await(10, TimeUnit.SECONDS); if (!a) { for (String s : messages) { @@ -1216,7 +1099,7 @@ public void mergeRequestOverflow() throws InterruptedException { .mergeWith(Flowable.fromIterable(Arrays.asList(3, 4))); final int expectedCount = 4; final CountDownLatch latch = new CountDownLatch(expectedCount); - f.subscribeOn(Schedulers.computation()).subscribe(new DefaultSubscriber() { + f.subscribeOn(Schedulers.computation()).subscribe(new DefaultSubscriber() /* NFI */ { @Override public void onStart() { @@ -1243,7 +1126,7 @@ public void onNext(Integer t) { } private static Consumer printCount() { - return new Consumer() { + return new Consumer() /* NFI */ { long count; @Override @@ -1255,31 +1138,18 @@ public void accept(Integer t1) { } private static Consumer pauseForMs(final long time) { - return new Consumer() { - @Override - public void accept(Integer s) { - try { - Thread.sleep(time); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }; + return _ -> { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; } - Function> toScalar = new Function>() { - @Override - public Flowable apply(Integer v) { - return Flowable.just(v); - } - }; + Function> toScalar = Flowable::just; - Function> toHiddenScalar = new Function>() { - @Override - public Flowable apply(Integer t) { - return Flowable.just(t).hide(); - } - }; + Function> toHiddenScalar = t -> Flowable.just(t).hide(); ; void runMerge(Function> func, TestSubscriberEx ts) { @@ -1312,7 +1182,7 @@ public void fastMergeHiddenScalar() { @Test public void slowMergeFullScalar() { for (final int req : new int[] { 16, 32, 64, 128, 256 }) { - TestSubscriberEx ts = new TestSubscriberEx(req) { + TestSubscriberEx ts = new TestSubscriberEx(req) /* NFI */ { int remaining = req; @Override @@ -1331,7 +1201,7 @@ public void onNext(Integer t) { @Test public void slowMergeHiddenScalar() { for (final int req : new int[] { 16, 32, 64, 128, 256 }) { - TestSubscriberEx ts = new TestSubscriberEx(req) { + TestSubscriberEx ts = new TestSubscriberEx(req) /* NFI */ { int remaining = req; @Override public void onNext(Integer t) { diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithCompletableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithCompletableTest.java index 0c569d3460..6daad9091b 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithCompletableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithCompletableTest.java @@ -19,7 +19,6 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.TestException; -import io.reactivex.rxjava4.functions.Action; import io.reactivex.rxjava4.processors.PublishProcessor; import io.reactivex.rxjava4.subjects.CompletableSubject; import io.reactivex.rxjava4.subscribers.TestSubscriber; @@ -32,12 +31,7 @@ public void normal() { final TestSubscriber ts = new TestSubscriber<>(); Flowable.range(1, 5).mergeWith( - Completable.fromAction(new Action() { - @Override - public void run() throws Exception { - ts.onNext(100); - } - }) + Completable.fromAction(() -> ts.onNext(100)) ) .subscribe(ts); @@ -74,12 +68,7 @@ public void normalBackpressured() { final TestSubscriber ts = new TestSubscriber<>(0L); Flowable.range(1, 5).mergeWith( - Completable.fromAction(new Action() { - @Override - public void run() throws Exception { - ts.onNext(100); - } - }) + Completable.fromAction(() -> ts.onNext(100)) ) .subscribe(ts); @@ -117,20 +106,12 @@ public void completeRace() { TestSubscriber ts = pp.mergeWith(cs).test(); - Runnable r1 = new Runnable() { - @Override - public void run() { - pp.onNext(1); - pp.onComplete(); - } - }; - - Runnable r2 = new Runnable() { - @Override - public void run() { - cs.onComplete(); - } - }; + Runnable r1 = () -> { + pp.onNext(1); + pp.onComplete(); + }; + + Runnable r2 = () -> cs.onComplete(); TestHelper.race(r1, r2); @@ -176,11 +157,6 @@ public void cancelMainOnOtherError() { @Test public void undeliverableUponCancel() { - TestHelper.checkUndeliverableUponCancel(new FlowableConverter>() { - @Override - public Flowable apply(Flowable upstream) { - return upstream.mergeWith(Completable.complete().hide()); - } - }); + TestHelper.checkUndeliverableUponCancel((FlowableConverter>) upstream -> upstream.mergeWith(Completable.complete().hide())); } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithMaybeTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithMaybeTest.java index 6ae50e8f23..e77f82c3ec 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithMaybeTest.java @@ -24,7 +24,6 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.TestException; -import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.processors.PublishProcessor; @@ -132,20 +131,12 @@ public void completeRace() { TestSubscriber ts = pp.mergeWith(cs).test(); - Runnable r1 = new Runnable() { - @Override - public void run() { - pp.onNext(1); - pp.onComplete(); - } - }; + Runnable r1 = () -> { + pp.onNext(1); + pp.onComplete(); + }; - Runnable r2 = new Runnable() { - @Override - public void run() { - cs.onSuccess(1); - } - }; + Runnable r2 = () -> cs.onSuccess(1); TestHelper.race(r1, r2); @@ -158,7 +149,7 @@ public void onNextSlowPath() { final PublishProcessor pp = PublishProcessor.create(); final MaybeSubject cs = MaybeSubject.create(); - TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() { + TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -182,7 +173,7 @@ public void onSuccessSlowPath() { final PublishProcessor pp = PublishProcessor.create(); final MaybeSubject cs = MaybeSubject.create(); - TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() { + TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -205,7 +196,7 @@ public void onSuccessSlowPathBackpressured() { final PublishProcessor pp = PublishProcessor.create(); final MaybeSubject cs = MaybeSubject.create(); - TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber(1) { + TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber(1) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -232,19 +223,9 @@ public void onSuccessFastPathBackpressuredRace() { final TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<>(0)); - Runnable r1 = new Runnable() { - @Override - public void run() { - cs.onSuccess(1); - } - }; + Runnable r1 = () -> cs.onSuccess(1); - Runnable r2 = new Runnable() { - @Override - public void run() { - ts.request(2); - } - }; + Runnable r2 = () -> ts.request(2); TestHelper.race(r1, r2); @@ -260,7 +241,7 @@ public void onErrorMainOverflow() { List errors = TestHelper.trackPluginErrors(); try { final AtomicReference> subscriber = new AtomicReference<>(); - TestSubscriber ts = new Flowable() { + TestSubscriber ts = new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber s) { s.onSubscribe(new BooleanSubscription()); @@ -307,19 +288,9 @@ public void onNextRequestRace() { pp.onNext(0); - Runnable r1 = new Runnable() { - @Override - public void run() { - pp.onNext(1); - } - }; + Runnable r1 = () -> pp.onNext(1); - Runnable r2 = new Runnable() { - @Override - public void run() { - ts.request(3); - } - }; + Runnable r2 = () -> ts.request(3); TestHelper.race(r1, r2); @@ -333,13 +304,7 @@ public void run() { @Test public void doubleOnSubscribeMain() { TestHelper.checkDoubleOnSubscribeFlowable( - new Function, Publisher>() { - @Override - public Publisher apply(Flowable f) - throws Exception { - return f.mergeWith(Maybe.just(1)); - } - } + f -> f.mergeWith(Maybe.just(1)) ); } @@ -358,7 +323,7 @@ public void drainExactRequestCancel() { TestSubscriber ts = pp.mergeWith(cs) .take(2) - .subscribeWith(new TestSubscriber(2) { + .subscribeWith(new TestSubscriber(2) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -382,7 +347,7 @@ public void drainRequestWhenLimitReached() { final MaybeSubject cs = MaybeSubject.create(); TestSubscriber ts = pp.mergeWith(cs) - .subscribeWith(new TestSubscriber() { + .subscribeWith(new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -441,12 +406,8 @@ public void cancelMainOnOtherError() { @Test public void undeliverableUponCancel() { - TestHelper.checkUndeliverableUponCancel(new FlowableConverter>() { - @Override - public Flowable apply(Flowable upstream) { - return upstream.mergeWith(Maybe.just(1).hide()); - } - }); + TestHelper.checkUndeliverableUponCancel((FlowableConverter>) upstream -> + upstream.mergeWith(Maybe.just(1).hide())); } @Test diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithSingleTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithSingleTest.java index 2f383f5723..e6763d17ed 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithSingleTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableMergeWithSingleTest.java @@ -24,7 +24,6 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.TestException; -import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.processors.PublishProcessor; @@ -128,20 +127,12 @@ public void completeRace() { TestSubscriber ts = pp.mergeWith(cs).test(); - Runnable r1 = new Runnable() { - @Override - public void run() { - pp.onNext(1); - pp.onComplete(); - } - }; + Runnable r1 = () -> { + pp.onNext(1); + pp.onComplete(); + }; - Runnable r2 = new Runnable() { - @Override - public void run() { - cs.onSuccess(1); - } - }; + Runnable r2 = () -> cs.onSuccess(1); TestHelper.race(r1, r2); @@ -154,7 +145,7 @@ public void onNextSlowPath() { final PublishProcessor pp = PublishProcessor.create(); final SingleSubject cs = SingleSubject.create(); - TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() { + TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -178,7 +169,7 @@ public void onSuccessSlowPath() { final PublishProcessor pp = PublishProcessor.create(); final SingleSubject cs = SingleSubject.create(); - TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() { + TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -201,7 +192,7 @@ public void onSuccessSlowPathBackpressured() { final PublishProcessor pp = PublishProcessor.create(); final SingleSubject cs = SingleSubject.create(); - TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber(1) { + TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber(1) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -228,19 +219,9 @@ public void onSuccessFastPathBackpressuredRace() { final TestSubscriber ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<>(0)); - Runnable r1 = new Runnable() { - @Override - public void run() { - cs.onSuccess(1); - } - }; + Runnable r1 = () -> cs.onSuccess(1); - Runnable r2 = new Runnable() { - @Override - public void run() { - ts.request(2); - } - }; + Runnable r2 = () -> ts.request(2); TestHelper.race(r1, r2); @@ -256,7 +237,7 @@ public void onErrorMainOverflow() { List errors = TestHelper.trackPluginErrors(); try { final AtomicReference> subscriber = new AtomicReference<>(); - TestSubscriber ts = new Flowable() { + TestSubscriber ts = new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber s) { s.onSubscribe(new BooleanSubscription()); @@ -303,19 +284,9 @@ public void onNextRequestRace() { pp.onNext(0); - Runnable r1 = new Runnable() { - @Override - public void run() { - pp.onNext(1); - } - }; + Runnable r1 = () -> pp.onNext(1); - Runnable r2 = new Runnable() { - @Override - public void run() { - ts.request(3); - } - }; + Runnable r2 = () -> ts.request(3); TestHelper.race(r1, r2); @@ -329,13 +300,7 @@ public void run() { @Test public void doubleOnSubscribeMain() { TestHelper.checkDoubleOnSubscribeFlowable( - new Function, Publisher>() { - @Override - public Publisher apply(Flowable f) - throws Exception { - return f.mergeWith(Single.just(1)); - } - } + f -> f.mergeWith(Single.just(1)) ); } @@ -354,7 +319,7 @@ public void drainExactRequestCancel() { TestSubscriber ts = pp.mergeWith(cs) .take(2) - .subscribeWith(new TestSubscriber(2) { + .subscribeWith(new TestSubscriber(2) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -378,7 +343,7 @@ public void drainRequestWhenLimitReached() { final SingleSubject cs = SingleSubject.create(); TestSubscriber ts = pp.mergeWith(cs) - .subscribeWith(new TestSubscriber() { + .subscribeWith(new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -437,12 +402,7 @@ public void cancelMainOnOtherError() { @Test public void undeliverableUponCancel() { - TestHelper.checkUndeliverableUponCancel(new FlowableConverter>() { - @Override - public Flowable apply(Flowable upstream) { - return upstream.mergeWith(Single.just(1).hide()); - } - }); + TestHelper.checkUndeliverableUponCancel((FlowableConverter>) upstream -> upstream.mergeWith(Single.just(1).hide())); } @Test diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableObserveOnTest.java index cd01831fa2..28d681fefa 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableObserveOnTest.java @@ -99,36 +99,19 @@ public void threadName() throws InterruptedException { final CountDownLatch completedLatch = new CountDownLatch(1); // assert subscribe is on main thread - obs = obs.doOnNext(new Consumer() { - - @Override - public void accept(String s) { - String threadName = Thread.currentThread().getName(); - System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName); - assertEquals(parentThreadName, threadName); - } - - }); + obs = obs.doOnNext(_ -> { + String threadName = Thread.currentThread().getName(); + System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName); + assertEquals(parentThreadName, threadName); + }); // assert observe is on new thread - obs.observeOn(Schedulers.newThread()).doOnNext(new Consumer() { - - @Override - public void accept(String t1) { - String threadName = Thread.currentThread().getName(); - boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler"); - System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName); - assertTrue(correctThreadName); - } - - }).doAfterTerminate(new Action() { - - @Override - public void run() { - completedLatch.countDown(); - - } - }).subscribe(subscriber); + obs.observeOn(Schedulers.newThread()).doOnNext(_ -> { + String threadName = Thread.currentThread().getName(); + boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler"); + System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName); + assertTrue(correctThreadName); + }).doAfterTerminate(() -> completedLatch.countDown()).subscribe(subscriber); if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) { fail("timed out waiting"); @@ -214,25 +197,13 @@ public void observeOnWithNewThreadScheduler() { final AtomicInteger count = new AtomicInteger(); final int _multiple = 99; - Flowable.range(1, 100000).map(new Function() { - - @Override - public Integer apply(Integer t1) { - return t1 * _multiple; - } - - }).observeOn(Schedulers.newThread()) - .blockingForEach(new Consumer() { - - @Override - public void accept(Integer t1) { - assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); - // FIXME toBlocking methods run on the current thread - String name = Thread.currentThread().getName(); - assertFalse("Wrong thread name: " + name, name.startsWith("Rx")); - } - - }); + Flowable.range(1, 100000).map(t1 -> t1 * _multiple).observeOn(Schedulers.newThread()) + .blockingForEach(t1 -> { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + // FIXME toBlocking methods run on the current thread + String name = Thread.currentThread().getName(); + assertFalse("Wrong thread name: " + name, name.startsWith("Rx")); + }); } @@ -244,25 +215,13 @@ public void observeOnWithThreadPoolScheduler() { final AtomicInteger count = new AtomicInteger(); final int _multiple = 99; - Flowable.range(1, 100000).map(new Function() { - - @Override - public Integer apply(Integer t1) { - return t1 * _multiple; - } - - }).observeOn(Schedulers.computation()) - .blockingForEach(new Consumer() { - - @Override - public void accept(Integer t1) { - assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); - // FIXME toBlocking methods run on the caller's thread - String name = Thread.currentThread().getName(); - assertFalse("Wrong thread name: " + name, name.startsWith("Rx")); - } - - }); + Flowable.range(1, 100000).map(t1 -> t1 * _multiple).observeOn(Schedulers.computation()) + .blockingForEach(t1 -> { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + // FIXME toBlocking methods run on the caller's thread + String name = Thread.currentThread().getName(); + assertFalse("Wrong thread name: " + name, name.startsWith("Rx")); + }); } /** @@ -279,33 +238,23 @@ public void observeOnOrderingConcurrency() { final AtomicInteger count = new AtomicInteger(); final int _multiple = 99; - Flowable.range(1, 10000).map(new Function() { - - @Override - public Integer apply(Integer t1) { - if (randomIntFrom0to100() > 98) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - return t1 * _multiple; - } - - }).observeOn(Schedulers.computation()) - .blockingForEach(new Consumer() { - - @Override - public void accept(Integer t1) { - assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + Flowable.range(1, 10000).map(t1 -> { + if (randomIntFrom0to100() > 98) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return t1 * _multiple; + }).observeOn(Schedulers.computation()) + .blockingForEach(t1 -> { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); // assertTrue(name.startsWith("RxComputationThreadPool")); - // FIXME toBlocking now runs its methods on the caller thread - String name = Thread.currentThread().getName(); - assertFalse("Wrong thread name: " + name, name.startsWith("Rx")); - } - - }); + // FIXME toBlocking now runs its methods on the caller thread + String name = Thread.currentThread().getName(); + assertFalse("Wrong thread name: " + name, name.startsWith("Rx")); + }); } @Test @@ -315,7 +264,8 @@ public void nonBlockingOuterWhileBlockingOnNext() throws InterruptedException { final CountDownLatch nextLatch = new CountDownLatch(1); final AtomicLong completeTime = new AtomicLong(); // use subscribeOn to make async, observeOn to move - Flowable.range(1, 2).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new DefaultSubscriber() { + Flowable.range(1, 2).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()) + .subscribe(new DefaultSubscriber() /* NFI */ { @Override public void onComplete() { @@ -405,29 +355,24 @@ public void afterUnsubscribeCalledThenObserverOnNextNeverCalled() { @Test public void backpressureWithTakeAfter() { final AtomicInteger generated = new AtomicInteger(); - Flowable flowable = Flowable.fromIterable(new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { + Flowable flowable = Flowable.fromIterable(() -> new Iterator() /* NFI */ { - @Override - public void remove() { - } + @Override + public void remove() { + } - @Override - public Integer next() { - return generated.getAndIncrement(); - } + @Override + public Integer next() { + return generated.getAndIncrement(); + } - @Override - public boolean hasNext() { - return true; - } - }; - } - }); + @Override + public boolean hasNext() { + return true; + } + }); - TestSubscriber testSubscriber = new TestSubscriber() { + TestSubscriber testSubscriber = new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { System.err.println("c t = " + t + " thread " + Thread.currentThread()); @@ -455,29 +400,24 @@ public void onNext(Integer t) { public void backpressureWithTakeAfterAndMultipleBatches() { int numForBatches = Flowable.bufferSize() * 3 + 1; // should be 4 batches == ((3*n)+1) items final AtomicInteger generated = new AtomicInteger(); - Flowable flowable = Flowable.fromIterable(new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { + Flowable flowable = Flowable.fromIterable(() -> new Iterator() /* NFI */ { - @Override - public void remove() { - } + @Override + public void remove() { + } - @Override - public Integer next() { - return generated.getAndIncrement(); - } + @Override + public Integer next() { + return generated.getAndIncrement(); + } - @Override - public boolean hasNext() { - return true; - } - }; - } - }); + @Override + public boolean hasNext() { + return true; + } + }); - TestSubscriber testSubscriber = new TestSubscriber() { + TestSubscriber testSubscriber = new TestSubscriber() /* NFI */ { @Override public void onNext(Integer t) { // System.err.println("c t = " + t + " thread " + Thread.currentThread()); @@ -499,27 +439,22 @@ public void onNext(Integer t) { @Test public void backpressureWithTakeBefore() { final AtomicInteger generated = new AtomicInteger(); - Flowable flowable = Flowable.fromIterable(new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { + Flowable flowable = Flowable.fromIterable(() -> new Iterator() /* NFI */ { - @Override - public void remove() { - } + @Override + public void remove() { + } - @Override - public Integer next() { - return generated.getAndIncrement(); - } + @Override + public Integer next() { + return generated.getAndIncrement(); + } - @Override - public boolean hasNext() { - return true; - } - }; - } - }); + @Override + public boolean hasNext() { + return true; + } + }); TestSubscriber testSubscriber = new TestSubscriber<>(); flowable @@ -535,21 +470,16 @@ public boolean hasNext() { @Test public void queueFullEmitsError() { final CountDownLatch latch = new CountDownLatch(1); - Flowable flowable = Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - for (int i = 0; i < Flowable.bufferSize() + 10; i++) { - subscriber.onNext(i); - } - latch.countDown(); - subscriber.onComplete(); - } + Flowable flowable = Flowable.unsafeCreate(subscriber -> { + subscriber.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < Flowable.bufferSize() + 10; i++) { + subscriber.onNext(i); + } + latch.countDown(); + subscriber.onComplete(); + }); - }); - - TestSubscriberEx testSubscriber = new TestSubscriberEx<>(new DefaultSubscriber() { + TestSubscriberEx testSubscriber = new TestSubscriberEx<>(new DefaultSubscriber() /* NFI */ { @Override public void onComplete() { @@ -604,7 +534,7 @@ public void onErrorCutsAheadOfOnNext() { final PublishProcessor processor = PublishProcessor.create(); final AtomicLong counter = new AtomicLong(); - TestSubscriberEx ts = new TestSubscriberEx<>(new DefaultSubscriber() { + TestSubscriberEx ts = new TestSubscriberEx<>(new DefaultSubscriber() /* NFI */ { @Override public void onComplete() { @@ -654,19 +584,14 @@ public void hotOperatorBackpressure() { TestSubscriberEx ts = new TestSubscriberEx<>(); Flowable.interval(0, 1, TimeUnit.MICROSECONDS) .observeOn(Schedulers.computation()) - .map(new Function() { - - @Override - public String apply(Long t1) { - System.out.println(t1); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } - return t1 + " slow value"; - } - - }).subscribe(ts); + .map(t1 -> { + System.out.println(t1); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + return t1 + " slow value"; + }).subscribe(ts); ts.awaitDone(5, TimeUnit.SECONDS); System.out.println("Errors: " + ts.errors()); @@ -677,38 +602,22 @@ public String apply(Long t1) { @Test public void errorPropagatesWhenNoOutstandingRequests() { Flowable timer = Flowable.interval(0, 1, TimeUnit.MICROSECONDS) - .doOnEach(new Consumer>() { - - @Override - public void accept(Notification n) { + .doOnEach(_ -> { // System.out.println("BEFORE " + n); - } - - }) + }) .observeOn(Schedulers.newThread()) - .doOnEach(new Consumer>() { - - @Override - public void accept(Notification n) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } + .doOnEach(_ -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } // System.out.println("AFTER " + n); - } - - }); + }); TestSubscriberEx ts = new TestSubscriberEx<>(); - Flowable.combineLatest(timer, Flowable. never(), new BiFunction() { - - @Override - public Long apply(Long t1, Integer t2) { - return t1; - } - - }).take(Flowable.bufferSize() * 2).subscribe(ts); + Flowable.combineLatest(timer, Flowable. never(), (t1, _) -> t1) + .take(Flowable.bufferSize() * 2).subscribe(ts); ts.awaitDone(5, TimeUnit.SECONDS); assertEquals(1, ts.errors().size()); @@ -721,7 +630,7 @@ public void requestOverflow() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(); Flowable.range(1, 100).observeOn(Schedulers.computation()) - .subscribe(new DefaultSubscriber() { + .subscribe(new DefaultSubscriber() /* NFI */ { boolean first = true; @@ -761,15 +670,9 @@ public void noMoreRequestsAfterUnsubscribe() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final List requests = Collections.synchronizedList(new ArrayList<>()); Flowable.range(1, 1000000) - .doOnRequest(new LongConsumer() { - - @Override - public void accept(long n) { - requests.add(n); - } - }) + .doOnRequest(n -> requests.add(n)) .observeOn(Schedulers.cached()) - .subscribe(new DefaultSubscriber() { + .subscribe(new DefaultSubscriber() /* NFI */ { @Override public void onStart() { @@ -878,12 +781,7 @@ public void fixedReplenishPattern() { final List requests = new ArrayList<>(); Flowable.range(1, 100) - .doOnRequest(new LongConsumer() { - @Override - public void accept(long v) { - requests.add(v); - } - }) + .doOnRequest(v -> requests.add(v)) .observeOn(test, false, 16).subscribe(ts); test.advanceTimeBy(1, TimeUnit.SECONDS); @@ -925,12 +823,7 @@ public void synchronousRebatching() { TestSubscriber ts = new TestSubscriber<>(); Flowable.range(1, 50) - .doOnRequest(new LongConsumer() { - @Override - public void accept(long r) { - requests.add(r); - } - }) + .doOnRequest(r -> requests.add(r)) .rebatchRequests(20) .subscribe(ts); @@ -955,14 +848,11 @@ public void rebatchRequestsArgumentCheck() { public void delayError() { Flowable.range(1, 5).concatWith(Flowable.error(new TestException())) .observeOn(Schedulers.computation(), true) - .doOnNext(new Consumer() { - @Override - public void accept(Integer v) throws Exception { - if (v == 1) { - Thread.sleep(100); - } - } - }) + .doOnNext(v -> { + if (v == 1) { + Thread.sleep(100); + } + }) .test() .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class, 1, 2, 3, 4, 5); @@ -972,12 +862,7 @@ public void accept(Integer v) throws Exception { public void conditionalConsumer() { Flowable.range(1, 5) .observeOn(Schedulers.single()) - .filter(new Predicate() { - @Override - public boolean test(Integer v) throws Exception { - return v % 2 == 0; - } - }) + .filter(v -> v % 2 == 0) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(2, 4); @@ -1009,12 +894,7 @@ public void conditionalConsumerFused() { Flowable.range(1, 5) .observeOn(Schedulers.single()) - .filter(new Predicate() { - @Override - public boolean test(Integer v) throws Exception { - return v % 2 == 0; - } - }) + .filter(v -> v % 2 == 0) .subscribe(ts); ts @@ -1030,12 +910,7 @@ public void conditionalConsumerFusedReject() { Flowable.range(1, 5) .observeOn(Schedulers.single()) - .filter(new Predicate() { - @Override - public boolean test(Integer v) throws Exception { - return v % 2 == 0; - } - }) + .filter(v -> v % 2 == 0) .subscribe(ts); ts @@ -1076,12 +951,7 @@ public void conditionalConsumerFusedAsync() { up .observeOn(Schedulers.single()) - .filter(new Predicate() { - @Override - public boolean test(Integer v) throws Exception { - return v % 2 == 0; - } - }) + .filter(v -> v % 2 == 0) .subscribe(ts); up.onNext(1); @@ -1104,12 +974,7 @@ public void conditionalConsumerHidden() { Flowable.range(1, 5).hide() .observeOn(Schedulers.single()) - .filter(new Predicate() { - @Override - public boolean test(Integer v) throws Exception { - return v % 2 == 0; - } - }) + .filter(v -> v % 2 == 0) .subscribe(ts); ts @@ -1126,12 +991,7 @@ public void conditionalConsumerBarrier() { Flowable.range(1, 5) .map(Functions.identity()) .observeOn(Schedulers.single()) - .filter(new Predicate() { - @Override - public boolean test(Integer v) throws Exception { - return v % 2 == 0; - } - }) + .filter(v -> v % 2 == 0) .subscribe(ts); ts @@ -1148,22 +1008,12 @@ public void dispose() { @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) throws Exception { - return f.observeOn(new TestScheduler()); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable((Function, Flowable>) f -> f.observeOn(new TestScheduler())); } @Test public void doubleOnSubscribeConditional() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) throws Exception { - return f.observeOn(new TestScheduler()).compose(TestHelper.conditional()); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable((Function, Flowable>) f -> f.observeOn(new TestScheduler()).compose(TestHelper.conditional())); } @Test @@ -1171,7 +1021,7 @@ public void badSource() { List errors = TestHelper.trackPluginErrors(); try { TestScheduler scheduler = new TestScheduler(); - TestSubscriber ts = new Flowable() { + TestSubscriber ts = new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -1315,7 +1165,7 @@ public void outputFusedCancelReentrant() throws Exception { final CountDownLatch cdl = new CountDownLatch(1); up.observeOn(Schedulers.single()) - .subscribe(new FlowableSubscriber() { + .subscribe(new FlowableSubscriber() /* NFI */ { Subscription upstream; int count; @Override @@ -1351,7 +1201,7 @@ public void onComplete() { @Test public void nonFusedPollThrows() { - new Flowable() { + new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -1361,7 +1211,7 @@ protected void subscribeActual(Subscriber subscriber) { oo.sourceMode = QueueFuseable.SYNC; oo.requested.lazySet(1); - oo.queue = new SimpleQueue() { + oo.queue = new SimpleQueue() /* NFI */ { @Override public boolean offer(Integer value) { @@ -1402,7 +1252,7 @@ public void clear() { @Test public void conditionalNonFusedPollThrows() { - new Flowable() { + new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -1412,7 +1262,7 @@ protected void subscribeActual(Subscriber subscriber) { oo.sourceMode = QueueFuseable.SYNC; oo.requested.lazySet(1); - oo.queue = new SimpleQueue() { + oo.queue = new SimpleQueue() /* NFI */ { @Override public boolean offer(Integer value) { @@ -1454,7 +1304,7 @@ public void clear() { @Test public void asycFusedPollThrows() { - new Flowable() { + new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -1464,7 +1314,7 @@ protected void subscribeActual(Subscriber subscriber) { oo.sourceMode = QueueFuseable.ASYNC; oo.requested.lazySet(1); - oo.queue = new SimpleQueue() { + oo.queue = new SimpleQueue() /* NFI */ { @Override public boolean offer(Integer value) { @@ -1505,7 +1355,7 @@ public void clear() { @Test public void conditionalAsyncFusedPollThrows() { - new Flowable() { + new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); @@ -1515,7 +1365,7 @@ protected void subscribeActual(Subscriber subscriber) { oo.sourceMode = QueueFuseable.ASYNC; oo.requested.lazySet(1); - oo.queue = new SimpleQueue() { + oo.queue = new SimpleQueue() /* NFI */ { @Override public boolean offer(Integer value) { @@ -1567,12 +1417,7 @@ public void trampolineScheduler() { public void conditionalNormal() { Flowable.range(1, 1000).hide() .observeOn(Schedulers.single()) - .filter(new Predicate() { - @Override - public boolean test(Integer v) throws Exception { - return v % 2 == 0; - } - }) + .filter(v -> v % 2 == 0) .take(250) .to(TestHelper.testConsumer()) .awaitDone(5, TimeUnit.SECONDS) @@ -1584,7 +1429,7 @@ public boolean test(Integer v) throws Exception { @Test public void syncFusedCancelAfterRequest() { - final TestSubscriber ts = new TestSubscriber(2L) { + final TestSubscriber ts = new TestSubscriber(2L) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -1619,7 +1464,7 @@ public void syncFusedCancelAfterRequest2() { @Test public void syncFusedCancelAfterRequestConditional() { - final TestSubscriber ts = new TestSubscriber(2L) { + final TestSubscriber ts = new TestSubscriber(2L) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); @@ -1759,19 +1604,9 @@ public void backFusedCancelConditional() { .filter(Functions.alwaysTrue()) .subscribe(ts); - Runnable r1 = new Runnable() { - @Override - public void run() { - ts.cancel(); - } - }; + Runnable r1 = () -> ts.cancel(); - Runnable r2 = new Runnable() { - @Override - public void run() { - scheduler.triggerActions(); - } - }; + Runnable r2 = () -> scheduler.triggerActions(); TestHelper.race(r1, r2); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java index 6372b8cbea..375a1a1541 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferStrategyTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.*; @@ -40,12 +39,7 @@ public class FlowableOnBackpressureBufferStrategyTest extends RxJavaTest { public void backpressureWithBufferDropOldest() throws InterruptedException { int bufferSize = 3; final AtomicInteger droppedCount = new AtomicInteger(0); - Action incrementOnDrop = new Action() { - @Override - public void run() throws Exception { - droppedCount.incrementAndGet(); - } - }; + Action incrementOnDrop = () -> droppedCount.incrementAndGet(); TestSubscriber ts = createTestSubscriber(); Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_OLDEST)) .subscribe(ts); @@ -61,7 +55,7 @@ public void run() throws Exception { } private TestSubscriber createTestSubscriber() { - return new TestSubscriber<>(new DefaultSubscriber() { + return new TestSubscriber<>(new DefaultSubscriber() /* NFI */ { @Override protected void onStart() { @@ -86,12 +80,7 @@ public void onNext(Long t) { public void backpressureWithBufferDropLatest() throws InterruptedException { int bufferSize = 3; final AtomicInteger droppedCount = new AtomicInteger(0); - Action incrementOnDrop = new Action() { - @Override - public void run() throws Exception { - droppedCount.incrementAndGet(); - } - }; + Action incrementOnDrop = () -> droppedCount.incrementAndGet(); TestSubscriber ts = createTestSubscriber(); Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_LATEST)) .subscribe(ts); @@ -106,20 +95,17 @@ public void run() throws Exception { assertEquals(droppedCount.get(), 500 - bufferSize); } - private static final Flowable send500ValuesAndComplete = Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(Subscriber s) { - BooleanSubscription bs = new BooleanSubscription(); - s.onSubscribe(bs); - long i = 0; - while (!bs.isCancelled() && i < 500) { - s.onNext(i++); - } - if (!bs.isCancelled()) { - s.onComplete(); - } - } - }); + private static final Flowable send500ValuesAndComplete = Flowable.unsafeCreate(s -> { + BooleanSubscription bs = new BooleanSubscription(); + s.onSubscribe(bs); + long i = 0; + while (!bs.isCancelled() && i < 500) { + s.onNext(i++); + } + if (!bs.isCancelled()) { + s.onComplete(); + } + }); @Test(expected = IllegalArgumentException.class) public void backpressureBufferNegativeCapacity() throws InterruptedException { @@ -156,33 +142,20 @@ public void overflowError() { @Test public void badSource() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable f) throws Exception { - return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR); - } - }, false, 1, 1, 1); + TestHelper.checkBadSourceFlowable(f -> f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR), false, 1, 1, 1); } @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) throws Exception { - return f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable((Function, Flowable>) f -> f.onBackpressureBuffer(8, Functions.EMPTY_ACTION, BackpressureOverflowStrategy.ERROR)); } @Test public void overflowCrashes() { Flowable.range(1, 20) - .onBackpressureBuffer(8, new Action() { - @Override - public void run() throws Exception { - throw new TestException(); - } - }, BackpressureOverflowStrategy.DROP_OLDEST) + .onBackpressureBuffer(8, () -> { + throw new TestException(); + }, BackpressureOverflowStrategy.DROP_OLDEST) .test(0L) .assertFailure(TestException.class); } diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferTest.java index 657ad75133..4376496fb4 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureBufferTest.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; -import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.*; @@ -55,7 +54,7 @@ public void noBackpressureSupport() { public void fixBackpressureWithBuffer() throws InterruptedException { final CountDownLatch l1 = new CountDownLatch(100); final CountDownLatch l2 = new CountDownLatch(150); - TestSubscriber ts = new TestSubscriber<>(new DefaultSubscriber() { + TestSubscriber ts = new TestSubscriber<>(new DefaultSubscriber() /* NFI */ { @Override protected void onStart() { @@ -112,7 +111,7 @@ public void fixBackpressureBufferZeroCapacity() throws InterruptedException { public void fixBackpressureBoundedBuffer() throws InterruptedException { final CountDownLatch l1 = new CountDownLatch(100); final CountDownLatch backpressureCallback = new CountDownLatch(1); - TestSubscriber ts = new TestSubscriber<>(new DefaultSubscriber() { + TestSubscriber ts = new TestSubscriber<>(new DefaultSubscriber() /* NFI */ { @Override protected void onStart() { @@ -135,12 +134,7 @@ public void onNext(Long t) { ts.request(100); infinite.subscribeOn(Schedulers.computation()) - .onBackpressureBuffer(500, new Action() { - @Override - public void run() { - backpressureCallback.countDown(); - } - }) + .onBackpressureBuffer(500, () -> backpressureCallback.countDown()) /*.take(1000)*/ .subscribe(ts); l1.await(); @@ -156,27 +150,18 @@ public void run() { assertEquals((long)ts.values().get(size - 1), size - 1); } - static final Flowable infinite = Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(Subscriber s) { - BooleanSubscription bs = new BooleanSubscription(); - s.onSubscribe(bs); - long i = 0; - while (!bs.isCancelled()) { - s.onNext(i++); - } - } - - }); - - private static final Action THROWS_NON_FATAL = new Action() { + static final Flowable infinite = Flowable.unsafeCreate(s -> { + BooleanSubscription bs = new BooleanSubscription(); + s.onSubscribe(bs); + long i = 0; + while (!bs.isCancelled()) { + s.onNext(i++); + } + }); - @Override - public void run() { - throw new RuntimeException(); - } - }; + private static final Action THROWS_NON_FATAL = () -> { + throw new RuntimeException(); + }; @Test public void nonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() { @@ -184,12 +169,7 @@ public void nonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() { TestSubscriber ts = TestSubscriber.create(0); infinite .subscribeOn(Schedulers.computation()) - .doOnError(new Consumer() { - @Override - public void accept(Throwable t) { - errorOccurred.set(true); - } - }) + .doOnError(_ -> errorOccurred.set(true)) .onBackpressureBuffer(1, THROWS_NON_FATAL) .subscribe(ts); ts.awaitDone(5, TimeUnit.SECONDS); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureDropTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureDropTest.java index 96dc48fe62..d96a9ffb14 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureDropTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureDropTest.java @@ -53,7 +53,7 @@ public void withObserveOn() throws InterruptedException { public void fixBackpressureWithBuffer() throws InterruptedException { final CountDownLatch l1 = new CountDownLatch(100); final CountDownLatch l2 = new CountDownLatch(150); - TestSubscriber ts = new TestSubscriber<>(new DefaultSubscriber() { + TestSubscriber ts = new TestSubscriber<>(new DefaultSubscriber() /* NFI */ { @Override protected void onStart() { @@ -95,7 +95,7 @@ public void onNext(Long t) { public void requestOverflow() throws InterruptedException { final AtomicInteger count = new AtomicInteger(); int n = 10; - range(n).onBackpressureDrop().subscribe(new DefaultSubscriber() { + range(n).onBackpressureDrop().subscribe(new DefaultSubscriber() /* NFI */ { @Override public void onStart() { @@ -120,45 +120,32 @@ public void onNext(Long t) { assertEquals(n, count.get()); } - static final Flowable infinite = Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(Subscriber s) { - BooleanSubscription bs = new BooleanSubscription(); - s.onSubscribe(bs); - long i = 0; - while (!bs.isCancelled()) { - s.onNext(i++); - } - } - - }); + static final Flowable infinite = Flowable.unsafeCreate(s -> { + BooleanSubscription bs = new BooleanSubscription(); + s.onSubscribe(bs); + long i = 0; + while (!bs.isCancelled()) { + s.onNext(i++); + } + }); private static Flowable range(final long n) { - return Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(Subscriber s) { - BooleanSubscription bs = new BooleanSubscription(); - s.onSubscribe(bs); - for (long i = 0; i < n; i++) { - if (bs.isCancelled()) { - break; - } - s.onNext(i); - } - s.onComplete(); - } - - }); + return Flowable.unsafeCreate(s -> { + BooleanSubscription bs = new BooleanSubscription(); + s.onSubscribe(bs); + for (long i = 0; i < n; i++) { + if (bs.isCancelled()) { + break; + } + s.onNext(i); + } + s.onComplete(); + }); } - private static final Consumer THROW_NON_FATAL = new Consumer() { - @Override - public void accept(Long n) { - throw new RuntimeException(); - } - }; + private static final Consumer THROW_NON_FATAL = _ -> { + throw new RuntimeException(); + }; @Test public void nonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() { @@ -169,12 +156,7 @@ public void nonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator range(2) // if haven't caught exception in onBackpressureDrop operator then would incorrectly // be picked up by this call to doOnError - .doOnError(new Consumer() { - @Override - public void accept(Throwable t) { - errorOccurred.set(true); - } - }) + .doOnError(_ -> errorOccurred.set(true)) .onBackpressureDrop(THROW_NON_FATAL) .subscribe(ts); @@ -183,22 +165,12 @@ public void accept(Throwable t) { @Test public void badSource() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable f) throws Exception { - return f.onBackpressureDrop(); - } - }, false, 1, 1, 1); + TestHelper.checkBadSourceFlowable((Function, Object>) Flowable::onBackpressureDrop, false, 1, 1, 1); } @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { - @Override - public Publisher apply(Flowable f) throws Exception { - return f.onBackpressureDrop(); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable((Function, Publisher>) Flowable::onBackpressureDrop); } @Test diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureErrorTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureErrorTest.java index c8300c52f5..17fbf28599 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureErrorTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureErrorTest.java @@ -16,11 +16,9 @@ import static org.junit.Assert.*; import org.junit.Test; -import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.MissingBackpressureException; -import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.subjects.PublishSubject; import io.reactivex.rxjava4.subscribers.TestSubscriber; import io.reactivex.rxjava4.testsupport.TestHelper; @@ -39,22 +37,12 @@ public void badRequest() { @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { - @Override - public Publisher apply(Flowable f) throws Exception { - return new FlowableOnBackpressureError<>(f); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable(f -> new FlowableOnBackpressureError<>(f)); } @Test public void badSource() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable f) throws Exception { - return new FlowableOnBackpressureError<>(f); - } - }, false, 1, 1, 1); + TestHelper.checkBadSourceFlowable(f -> new FlowableOnBackpressureError<>(f), false, 1, 1, 1); } @Test diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureLatestTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureLatestTest.java index c295d33848..f93175682c 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureLatestTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureLatestTest.java @@ -18,11 +18,9 @@ import org.junit.*; import org.mockito.InOrder; -import static java.util.concurrent.Flow.*; import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.exceptions.TestException; -import io.reactivex.rxjava4.functions.Function; import io.reactivex.rxjava4.processors.PublishProcessor; import io.reactivex.rxjava4.schedulers.Schedulers; import io.reactivex.rxjava4.subscribers.TestSubscriber; @@ -171,7 +169,7 @@ public void synchronousDrop() { @Test public void asynchronousDrop() { - TestSubscriberEx ts = new TestSubscriberEx(1L) { + TestSubscriberEx ts = new TestSubscriberEx(1L) /* NFI */ { final Random rnd = new Random(); @Override public void onNext(Integer t) { @@ -208,12 +206,7 @@ public void onNext(Integer t) { @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { - @Override - public Publisher apply(Flowable f) throws Exception { - return f.onBackpressureLatest(); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable(Flowable::onBackpressureLatest); } @Test diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceTest.java index 4e54beaf3e..682d2d0974 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceTest.java @@ -135,7 +135,7 @@ public void reduceBackpressuredSync() { } private TestSubscriberEx createDelayedSubscriber() { - return new TestSubscriberEx(1L) { + return new TestSubscriberEx(1L) /* NFI */ { final Random rnd = new Random(); @Override diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceWithTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceWithTest.java index 63f347ce82..f8ffd4ac75 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceWithTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnBackpressureReduceWithTest.java @@ -175,7 +175,7 @@ public void synchronousDrop() { } private TestSubscriberEx createDelayedSubscriber() { - return new TestSubscriberEx(1L) { + return new TestSubscriberEx(1L) /* NFI */ { final Random rnd = new Random(); @Override diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java index 1a939211a6..fc00c0fc08 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java @@ -71,16 +71,13 @@ public void mapResumeAsyncNext() { // Introduce map function that fails intermittently (Map does not prevent this when the observer is a // rx.operator incl onErrorResumeNextViaObservable) - w = w.map(new Function() { - @Override - public String apply(String s) { - if ("fail".equals(s)) { - throw new RuntimeException("Forced Failure"); - } - System.out.println("BadMapper:" + s); - return s; - } - }); + w = w.map(s -> { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + System.out.println("BadMapper:" + s); + return s; + }); Flowable flowable = w.onErrorResumeWith(resume); @@ -118,28 +115,23 @@ static final class TestObservable implements Publisher { public void subscribe(final Subscriber subscriber) { System.out.println("TestObservable subscribed to ..."); subscriber.onSubscribe(upstream); - t = new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("running TestObservable thread"); - for (String s : values) { - if ("fail".equals(s)) { - throw new RuntimeException("Forced Failure"); - } - System.out.println("TestObservable onNext: " + s); - subscriber.onNext(s); - } - System.out.println("TestObservable onComplete"); - subscriber.onComplete(); - } catch (Throwable e) { - System.out.println("TestObservable onError: " + e); - subscriber.onError(e); - } - } - - }); + t = new Thread(() -> { + try { + System.out.println("running TestObservable thread"); + for (String s : values) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + System.out.println("TestObservable onNext: " + s); + subscriber.onNext(s); + } + System.out.println("TestObservable onComplete"); + subscriber.onComplete(); + } catch (Throwable e) { + System.out.println("TestObservable onError: " + e); + subscriber.onError(e); + } + }); System.out.println("starting TestObservable thread"); t.start(); System.out.println("done starting TestObservable thread"); @@ -152,7 +144,7 @@ public void backpressure() { Flowable.range(0, 100000) .onErrorResumeWith(Flowable.just(1)) .observeOn(Schedulers.computation()) - .map(new Function() { + .map(new Function() /* NFI */ { int c; @Override diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java index e63e2dd416..255c8185b3 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorResumeNextViaFunctionTest.java @@ -40,27 +40,18 @@ public class FlowableOnErrorResumeNextViaFunctionTest extends RxJavaTest { @Test public void resumeNextWithSynchronousExecution() { final AtomicReference receivedException = new AtomicReference<>(); - Flowable w = Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - subscriber.onNext("one"); - subscriber.onError(new Throwable("injected failure")); - subscriber.onNext("two"); - subscriber.onNext("three"); - } - }); - - Function> resume = new Function>() { - - @Override - public Flowable apply(Throwable t1) { - receivedException.set(t1); - return Flowable.just("twoResume", "threeResume"); - } - - }; + Flowable w = Flowable.unsafeCreate(subscriber -> { + subscriber.onSubscribe(new BooleanSubscription()); + subscriber.onNext("one"); + subscriber.onError(new Throwable("injected failure")); + subscriber.onNext("two"); + subscriber.onNext("three"); + }); + + Function> resume = t1 -> { + receivedException.set(t1); + return Flowable.just("twoResume", "threeResume"); + }; Flowable flowable = w.onErrorResumeNext(resume); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -82,15 +73,10 @@ public void resumeNextWithAsyncExecution() { final AtomicReference receivedException = new AtomicReference<>(); Subscription s = mock(Subscription.class); TestFlowable w = new TestFlowable(s, "one"); - Function> resume = new Function>() { - - @Override - public Flowable apply(Throwable t1) { - receivedException.set(t1); - return Flowable.just("twoResume", "threeResume"); - } - - }; + Function> resume = t1 -> { + receivedException.set(t1); + return Flowable.just("twoResume", "threeResume"); + }; Flowable flowable = Flowable.unsafeCreate(w).onErrorResumeNext(resume); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -120,14 +106,9 @@ public Flowable apply(Throwable t1) { public void functionThrowsError() { Subscription s = mock(Subscription.class); TestFlowable w = new TestFlowable(s, "one"); - Function> resume = new Function>() { - - @Override - public Flowable apply(Throwable t1) { - throw new RuntimeException("exception from function"); - } - - }; + Function> resume = _ -> { + throw new RuntimeException("exception from function"); + }; Flowable flowable = Flowable.unsafeCreate(w).onErrorResumeNext(resume); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -154,25 +135,15 @@ public void mapResumeAsyncNext() { // Introduce map function that fails intermittently (Map does not prevent this when the observer is a // rx.operator incl onErrorResumeNextViaFlowable) - w = w.map(new Function() { - @Override - public String apply(String s) { - if ("fail".equals(s)) { - throw new RuntimeException("Forced Failure"); - } - System.out.println("BadMapper:" + s); - return s; - } - }); - - Flowable flowable = w.onErrorResumeNext(new Function>() { - - @Override - public Flowable apply(Throwable t1) { - return Flowable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation()); - } - - }); + w = w.map(s -> { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + System.out.println("BadMapper:" + s); + return s; + }); + + Flowable flowable = w.onErrorResumeNext(_ -> Flowable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation())); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -202,23 +173,18 @@ private static class TestFlowable implements Publisher { public void subscribe(final Subscriber subscriber) { System.out.println("TestFlowable subscribed to ..."); subscriber.onSubscribe(new BooleanSubscription()); - t = new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("running TestFlowable thread"); - for (String s : values) { - System.out.println("TestFlowable onNext: " + s); - subscriber.onNext(s); - } - throw new RuntimeException("Forced Failure"); - } catch (Throwable e) { - subscriber.onError(e); - } - } - - }); + t = new Thread(() -> { + try { + System.out.println("running TestFlowable thread"); + for (String s : values) { + System.out.println("TestFlowable onNext: " + s); + subscriber.onNext(s); + } + throw new RuntimeException("Forced Failure"); + } catch (Throwable e) { + subscriber.onError(e); + } + }); System.out.println("starting TestFlowable thread"); t.start(); System.out.println("done starting TestFlowable thread"); @@ -230,16 +196,9 @@ public void run() { public void backpressure() { TestSubscriber ts = new TestSubscriber<>(); Flowable.range(0, 100000) - .onErrorResumeNext(new Function>() { - - @Override - public Flowable apply(Throwable t1) { - return Flowable.just(1); - } - - }) + .onErrorResumeNext((Function>) _ -> Flowable.just(1)) .observeOn(Schedulers.computation()) - .map(new Function() { + .map(new Function() /* NFI */ { int c; @Override @@ -267,12 +226,7 @@ public void normalBackpressure() { PublishProcessor pp = PublishProcessor.create(); - pp.onErrorResumeNext(new Function>() { - @Override - public Flowable apply(Throwable v) { - return Flowable.range(3, 2); - } - }).subscribe(ts); + pp.onErrorResumeNext((Function>) _ -> Flowable.range(3, 2)).subscribe(ts); ts.request(2); @@ -293,13 +247,8 @@ public Flowable apply(Throwable v) { @Test public void badOtherSource() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable f) throws Exception { - return Flowable.error(new IOException()) - .onErrorResumeNext(Functions.justFunction(f)); - } - }, false, 1, 1, 1); + TestHelper.checkBadSourceFlowable(f -> Flowable.error(new IOException()) + .onErrorResumeNext(Functions.justFunction(f)), false, 1, 1, 1); } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorReturnTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorReturnTest.java index c98bbe0b17..e57ba46fe0 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorReturnTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableOnErrorReturnTest.java @@ -41,15 +41,10 @@ public void resumeNext() { Flowable w = Flowable.unsafeCreate(f); final AtomicReference capturedException = new AtomicReference<>(); - Flowable flowable = w.onErrorReturn(new Function() { - - @Override - public String apply(Throwable e) { - capturedException.set(e); - return "failure"; - } - - }); + Flowable flowable = w.onErrorReturn(e -> { + capturedException.set(e); + return "failure"; + }); Subscriber subscriber = TestHelper.mockSubscriber(); flowable.subscribe(subscriber); @@ -76,15 +71,10 @@ public void functionThrowsError() { Flowable w = Flowable.unsafeCreate(f); final AtomicReference capturedException = new AtomicReference<>(); - Flowable flowable = w.onErrorReturn(new Function() { - - @Override - public String apply(Throwable e) { - capturedException.set(e); - throw new RuntimeException("exception from function"); - } - - }); + Flowable flowable = w.onErrorReturn(e -> { + capturedException.set(e); + throw new RuntimeException("exception from function"); + }); Subscriber subscriber = TestHelper.mockSubscriber(); flowable.subscribe(subscriber); @@ -111,25 +101,15 @@ public void mapResumeAsyncNext() { // Introduce map function that fails intermittently (Map does not prevent this when the observer is a // rx.operator incl onErrorResumeNextViaFlowable) - w = w.map(new Function() { - @Override - public String apply(String s) { - if ("fail".equals(s)) { - throw new RuntimeException("Forced Failure"); - } - System.out.println("BadMapper:" + s); - return s; - } - }); - - Flowable flowable = w.onErrorReturn(new Function() { - - @Override - public String apply(Throwable t1) { - return "resume"; - } + w = w.map(s -> { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + System.out.println("BadMapper:" + s); + return s; + }); - }); + Flowable flowable = w.onErrorReturn(_ -> "resume"); Subscriber subscriber = TestHelper.mockSubscriber(); TestSubscriber ts = new TestSubscriber<>(subscriber, Long.MAX_VALUE); @@ -148,16 +128,9 @@ public String apply(Throwable t1) { public void backpressure() { TestSubscriber ts = new TestSubscriber<>(); Flowable.range(0, 100000) - .onErrorReturn(new Function() { - - @Override - public Integer apply(Throwable t1) { - return 1; - } - - }) + .onErrorReturn(_ -> 1) .observeOn(Schedulers.computation()) - .map(new Function() { + .map(new Function() /* NFI */ { int c; @Override @@ -192,23 +165,18 @@ private static class TestFlowable implements Publisher { public void subscribe(final Subscriber subscriber) { subscriber.onSubscribe(new BooleanSubscription()); System.out.println("TestFlowable subscribed to ..."); - t = new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("running TestFlowable thread"); - for (String s : values) { - System.out.println("TestFlowable onNext: " + s); - subscriber.onNext(s); - } - throw new RuntimeException("Forced Failure"); - } catch (Throwable e) { - subscriber.onError(e); - } - } - - }); + t = new Thread(() -> { + try { + System.out.println("running TestFlowable thread"); + for (String s : values) { + System.out.println("TestFlowable onNext: " + s); + subscriber.onNext(s); + } + throw new RuntimeException("Forced Failure"); + } catch (Throwable e) { + subscriber.onError(e); + } + }); System.out.println("starting TestFlowable thread"); t.start(); System.out.println("done starting TestFlowable thread"); @@ -221,12 +189,7 @@ public void normalBackpressure() { PublishProcessor pp = PublishProcessor.create(); - pp.onErrorReturn(new Function() { - @Override - public Integer apply(Throwable e) { - return 3; - } - }).subscribe(ts); + pp.onErrorReturn(_ -> 3).subscribe(ts); ts.request(2); @@ -260,17 +223,12 @@ public void dispose() { @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { - @Override - public Flowable apply(Flowable f) throws Exception { - return f.onErrorReturnItem(1); - } - }); + TestHelper.checkDoubleOnSubscribeFlowable(f -> f.onErrorReturnItem(1)); } @Test public void doubleOnError() { - new Flowable() { + new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber s) { s.onSubscribe(new BooleanSubscription()); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishFunctionTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishFunctionTest.java index 35fa37377a..959d31ad89 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishFunctionTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishFunctionTest.java @@ -124,7 +124,7 @@ public void oneStartOnly() { final AtomicInteger startCount = new AtomicInteger(); - TestSubscriber ts = new TestSubscriber() { + TestSubscriber ts = new TestSubscriber() /* NFI */ { @Override public void onStart() { startCount.incrementAndGet(); @@ -250,7 +250,7 @@ public void badSource() { @Test public void frontOverflow() { - new Flowable() { + new Flowable() /* NFI */ { @Override protected void subscribeActual(Subscriber s) { s.onSubscribe(new BooleanSubscription()); @@ -312,7 +312,7 @@ public void oneByOne() { public void completeCancelRaceNoRequest() { final PublishProcessor pp = PublishProcessor.create(); - final TestSubscriber ts = new TestSubscriber(1L) { + final TestSubscriber ts = new TestSubscriber(1L) /* NFI */ { @Override public void onNext(Integer t) { super.onNext(t); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishMulticastTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishMulticastTest.java index 0bb5666b4f..f2bcc9e613 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishMulticastTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishMulticastTest.java @@ -53,7 +53,7 @@ public void asyncFusedInput() { public void fusionRejectedInput() { MulticastProcessor mp = new MulticastProcessor<>(128, true); - mp.onSubscribe(new QueueSubscription() { + mp.onSubscribe(new QueueSubscription() /* NFI */ { @Override public int requestFusion(int mode) { @@ -113,19 +113,9 @@ public void addRemoveRace() { assertTrue(mp.add(ms1)); - Runnable r1 = new Runnable() { - @Override - public void run() { - mp.remove(ms1); - } - }; - - Runnable r2 = new Runnable() { - @Override - public void run() { - mp.add(ms2); - } - }; + Runnable r1 = () -> mp.remove(ms1); + + Runnable r2 = () -> mp.add(ms2); TestHelper.race(r1, r2); }