If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.
Some ReactiveX Observable operators have variants that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.
TBD
TBD
You obtain a Scheduler from the factory methods described in
the
Schedulers
class. The following table shows the varieties of Scheduler that
are available to you by means of these methods in RxGroovy:
Scheduler | purpose |
---|---|
Schedulers.computation( ) | meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors |
Schedulers.from(executor) | uses the specified Executor as a Scheduler |
Schedulers.immediate( ) | schedules work to begin immediately in the current thread |
Schedulers.io( ) | meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ) ; Schedulers.io( ) by default is a CachedThreadScheduler , which is something like a new thread scheduler with thread caching |
Schedulers.newThread( ) | creates a new thread for each unit of work |
Schedulers.trampoline( ) | queues work to begin on the current thread after any already-queued work |
Some Observable operators in RxGroovy have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. Others do not operate on any particular Scheduler, or operate on a particular default Scheduler. Those that have a particular default Scheduler include:
The
TestScheduler
allows you to exercise fine-tuned manual control over how the
Scheduler’s clock behaves. This can be useful for testing interactions that depend on precise
arrangements of actions in time. This Scheduler has three additional methods:
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
You obtain a Scheduler from the factory methods described in
the
Schedulers
class. The following table shows the varieties of Scheduler that
are available to you by means of these methods in RxJava:
Scheduler | purpose |
---|---|
Schedulers.computation( ) | meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors |
Schedulers.from(executor) | uses the specified Executor as a Scheduler |
Schedulers.immediate( ) | schedules work to begin immediately in the current thread |
Schedulers.io( ) | meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ) ; Schedulers.io( ) by default is a CachedThreadScheduler , which is something like a new thread scheduler with thread caching |
Schedulers.newThread( ) | creates a new thread for each unit of work |
Schedulers.trampoline( ) | queues work to begin on the current thread after any already-queued work |
Some Observable operators in RxJava have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. Others do not operate on any particular Scheduler, or operate on a particular default Scheduler. Those that have a particular default Scheduler include:
Aside from passing these Schedulers in to RxJava Observable operators, you can also use them to
schedule your own work on Subscriptions. The following example uses
the schedule
method of
the Scheduler.Worker
class to schedule work on the newThread
Scheduler:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
To schedule recursive calls, you can use schedule
and then schedule(this)
on the
Worker object:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Objects of the Worker
class implement
the Subscription
interface, with its isUnsubscribed
and unsubscribe
methods, so you can stop work when a subscription is cancelled, or you can cancel the
subscription from within the scheduled task:
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
The Worker
is also a Subscription
and so you can (and should, eventually) call
its unsubscribe
method to signal that it can halt work and release resources:
worker.unsubscribe();
You can also use a version of schedule
that delays your action on the given Scheduler until a certain timespan has passed. The
following example schedules someAction
to be performed on
someScheduler
after 500ms have passed according to that Scheduler’s clock:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
Another Scheduler
method
allows you to schedule an action to take place at regular intervals. The following example
schedules someAction
to be performed on someScheduler
after 500ms
have passed, and then every 250ms thereafter:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
The
TestScheduler
allows you to exercise fine-tuned manual control over how the
Scheduler’s clock behaves. This can be useful for testing interactions that depend on precise
arrangements of actions in time. This Scheduler has three additional methods:
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
In RxJS you obtain Schedulers from the Rx.Scheduler
object or as independently-implemented
objects. The following table shows the varieties of Scheduler that are available to you in RxJS:.
Scheduler | purpose |
---|---|
Rx.Scheduler.currentThread | schedules work as soon as possible on the current thread |
Rx.HistoricalScheduler | schedules work as though it were occurring at an arbitrary historical time |
Rx.Scheduler.immediate | schedules work immediately on the current thread |
Rx.TestScheduler | for unit testing; this allows you to manually manipulate the movement of time |
Rx.Scheduler.timeout | schedules work by means of a timed callback |
TBD
TBD
TBD
TBD
TBD