E
- the type of events.public class DXFeedSubscription<E> extends Object implements Serializable, ObservableSubscription<E>, AutoCloseable
String
objects,
complex ones use specialized object as described in the corresponding event types.
However, every symbol has a unique string representation that can be used.
Each event is represented by a concrete instance of
event class. Event type is represented by a class literal. For example,
Quote.class
is an event type for a best market quote.
EventType
.
Common types of market events extend MarketEvent
class
and reside in com.dxfeed.event.market
package.
The set of subscribed symbols and the set of subscribed event types are maintained separately. The subscription is considered to be interested in the cross-product of these sets. That is, subscription is interested in any event whose type is in the set of subscribed event types and whose symbol is in the set of subscribed symbols.
The set of event types is specified when subscription is created and cannot be changed afterward.
The set of symbols can be modified with
setSymbols
, addSymbols
,
removeSymbols
, and clear
methods.
Each of xxxSymbols
method exists in two version. One version accepts a Collection
of symbols and it recommended for bulk modifications of symbol set. The other version accepts
varargs arrays of symbols and is provided as a convenience method to set/add/remove one or few
symbols.
Symbols in a set are compared using their equals
method. All symbol objects
must properly support hashCode
method. A set of symbols cannot contain two equal symbols.
If a symbol that is added
to a set of symbols is equal to the other one,
then the old symbol is removed from the set and the new symbol instance is retained. However,
the ObservableSubscriptionChangeListener
(see below) in this case is notified on the change of
subscription only when new symbol object implements FilteredSubscriptionSymbol
marker interface.
WildcardSymbol.ALL
represents subscription to all possible symbols
and have the effect of subscribing to all of them. See WildcardSymbol
for more details. It
is always represented by a separate class (instead of a usual String
) in order
to avoid potential resource consumption problems that may stem from an accidental subscription via wildcard.
All string symbols that start with WildcardSymbol.RESERVED_PREFIX
("*")
are reserved for the same reason. Subscription to the corresponding string does not have any effect
(no events will be received). Thus, it is safe to add user-specified strings to the subscription via
addSymbols
method without any prior validation of the user input.
NOTE: Wildcard subscription can create extremely high network and CPU load for certain kinds of
high-frequency events like quotes. It requires a special arrangement on the side of upstream data provider and
is disabled by default in upstream feed configuration. Make that sure you have adequate resources and understand
the impact before using it. It can be used for low-frequency events only (like Forex quotes), because each instance
of DXFeedSubscription
processes events in a single thread and there is no provision to load-balance wildcard
subscription amongst multiple threads.
Contact your data provider for the corresponding configuration arrangement if needed.
A special class of TimeSeriesSubscriptionSymbol
symbols is used internally to represent a subscription
to time-series of events. Two instances of TimeSeriesSubscriptionSymbol
objects are
equal
when their
underlying symbols
are equal, thus only one, most recently
added, instance of TimeSeriesSubscriptionSymbol
will be kept in the set of subscribed symbols.
It is recommended to use DXFeedTimeSeriesSubscription
convenience class for time-series subscriptions.
Candle
events use CandleSymbol
class to represent the complex structure of the candle symbol
and its attributes. However, both String
and CandleSymbol
objects can be used in subscription.
The corresponding strings will be converted to CandleSymbol
using its valueOf
method. Do not mix String
and CandleSymbol
subscription in a single DXFeedSubscription
instance.
DXFeedEventListener
instances that are notified on any events.
Event listeners are added with addEventListener
method.
Event listeners must be installed before changing the set of subscribed symbols.
When the set of subscribed symbols changes all registered event listeners receive update on the
last events for all newly added symbols.
This class keeps a set of ObservableSubscriptionChangeListener
instances that are notified on any
change in subscription. These listeners are installed by DXFeed
to keep
track of the subscription state and communicate subscription upstream to data providers.
DXFeed.attachSubscription
method.
Subscription that is created via DXFeed.createSubscription
is attached to the
corresponding feed. The feed tracks all changes in subscription by installing
ObservableSubscriptionChangeListener
and invokes
processEvents
for all received events.
Subscription can be detached from the feed with
DXFeed.detachSubscription
method.
Subscription can be attached to multiple feeds at the same time. In this case it receives events from all feeds but there is no way distinguish which feed the corresponding event came from.
The convenient way to detach subscription from the feed is to call its close
method. Closed subscription
becomes permanently detached from all feeds, removes all its listeners and is guaranteed to be reclaimable by the garbage
collector as soon as all external references to it are cleared.
The other way is to close an associated feed. This cannot be done via a DXFeed
instance, but it can be done
indirectly by closing the associated endpoint
.
ObservableSubscriptionChangeListener
that is installed by DXFeed
when this subscription is attached is not serializable.
Thus, freshly deserialized instance of this class will be detached. It has to be attached
to the feed after deserialization in order for it to start receiving events.
Installed DXFeedEventListener
instances are invoked from a separate thread via the executor.
Default executor for all subscriptions is configured with DXEndpoint.executor
method. Each subscription can individually override its executor with setExecutor
method.
Event listeners are invoked in a serial manner with respect to a given DXFeedSubscription instance.
That is, next event notification will not be performed until
DXFeedEventListener.eventsReceived
method for the previous
notification completes. It guarantees that the order of events in a given instance of DXFeedSubscription
is preserved.
This requirement on ordering limits concurrency of event processing. Effectively, each
subscription can use at most one CPU core to process its events. If there is a need to simultaneously
process events on a large number of symbols and this event processing is so resource-consuming, that
one CPU core is not enough to process all events in real-time, then it is advised to split symbols between multiple
DXFeedSubscription
instances. If event processing is mostly CPU-bound, then the good rule of thumb
is to have as many DXFeedSubscription
instances as there are CPU cores in the system.
However, multiple tasks can get submitted to the executor at the same time. In the current implementation,
at most two tasks are submitted at any time if
DXFEED_AGGREGATION_PERIOD_PROPERTY
is used.
One task for immediate processing of data snapshots via Executor.execute
method
and another task for delayed processing of data updates via
ScheduledExecutorService.schedule
method
if the executor implements ScheduledExecutorService
interface.
At most one task is submitted at any time if this property is not used.
Installed ObservableSubscriptionChangeListener
instances are notified on symbol set changes
while holding the lock on this subscription and in the same thread that changed the set of subscribed symbols
in this subscription.
Custom executor can be used by backend applications that do not need to immediately retrieve a collection of events, but want to poll for new events at a later time, for example, from inside of a servlet request. The following code pattern is suggested in this case to initialize subscription's executor:
When there is a time to poll for new events, the following code can be used:ConcurrentLinkedQueue
<Runnable
> taskQueue = new ConcurrentLinkedQueue<>(); subscription.setExecutor
(newExecutor
() { public void execute(Runnable
task) { taskQueue.add
(task); } }); subscription.addEventListener
(...);
and event listener'sRunnable
task; while ((task = taskQueue.poll
()) != null) task.run();
eventsReceived
method will be invoked
in this thread from inside of task.run()
invocation.
This approach has a clear advantage with LastingEvent
types.
If the above polling code is delayed or is executed only periodically, then incoming events have a chance
to get conflated and listener will receive only on the most recent event updates for processing.
Constructor and Description |
---|
DXFeedSubscription(Class<? extends E>... eventTypes)
Creates detached subscription for the given list of event types.
|
DXFeedSubscription(Class<? extends E> eventType)
Creates detached subscription for a single event type.
|
Modifier and Type | Method and Description |
---|---|
void |
addChangeListener(ObservableSubscriptionChangeListener listener)
Adds subscription change listener.
|
void |
addEventListener(DXFeedEventListener<E> listener)
Adds listener for events.
|
void |
addSymbols(Collection<?> symbols)
Adds the specified collection of symbols to the set of subscribed symbols.
|
void |
addSymbols(Object... symbols)
Adds the specified array of symbols to the set of subscribed symbols.
|
void |
addSymbols(Object symbol)
Adds the specified symbol to the set of subscribed symbols.
|
void |
attach(DXFeed feed)
Attaches subscription to the specified feed.
|
void |
clear()
Clears the set of subscribed symbols.
|
void |
close()
Closes this subscription and makes it permanently detached.
|
boolean |
containsEventType(Class<?> eventType)
Returns
true if this subscription contains the corresponding event type. |
protected Object |
decorateSymbol(Object symbol)
Decorates the specified symbol after it was received from the
DXFeedSubscription client code
before it goes to installed ObservableSubscriptionChangeListener instances. |
void |
detach(DXFeed feed)
Detaches subscription from the specified feed.
|
Set<?> |
getDecoratedSymbols()
Returns a set of decorated symbols (depending on the actual implementation class of
DXFeedSubscription ). |
Set<Class<? extends E>> |
getEventTypes()
Returns a set of subscribed event types.
|
Executor |
getExecutor()
Returns executor for processing event notifications on this subscription.
|
Set<?> |
getSymbols()
Returns a set of subscribed symbols.
|
boolean |
isClosed()
Returns
true if this subscription is closed. |
void |
removeChangeListener(ObservableSubscriptionChangeListener listener)
Removes subscription change listener.
|
void |
removeEventListener(DXFeedEventListener<E> listener)
Removes listener for events.
|
void |
removeSymbols(Collection<?> symbols)
Removes the specified collection of symbols from the set of subscribed symbols.
|
void |
removeSymbols(Object... symbols)
Removes the specified array of symbols from the set of subscribed symbols.
|
void |
setExecutor(Executor executor)
Changes executor for processing event notifications on this subscription.
|
void |
setSymbols(Collection<?> symbols)
Changes the set of subscribed symbols so that it contains just the symbols from the specified collection.
|
void |
setSymbols(Object... symbols)
Changes the set of subscribed symbols so that it contains just the symbols from the specified array.
|
protected boolean |
shallNotifyOnSymbolUpdate(Object symbol,
Object oldSymbol)
Compares newly added symbol with the old one that was present before for
the purpose of notifying
installed
ObservableSubscriptionChangeListener about the change. |
protected Object |
undecorateSymbol(Object symbol)
Undoes the decoration of the specified symbol doing the reverse operation to
decorateSymbol(Object) . |
public DXFeedSubscription(Class<? extends E> eventType)
eventType
- the event type.NullPointerException
- if event type is null.@SafeVarargs public DXFeedSubscription(Class<? extends E>... eventTypes)
eventTypes
- the list of event types.IllegalArgumentException
- if the list of event types is empty.NullPointerException
- if any event type is null.public void attach(DXFeed feed)
feed
- feed to attach to.public void detach(DXFeed feed)
feed
- feed to detach from.public boolean isClosed()
true
if this subscription is closed.isClosed
in interface ObservableSubscription<E>
close()
public void close()
ObservableSubscriptionChangeListener
instances by invoking
subscriptionClosed
while holding the lock for this subscription. This method clears lists of all installed
event listeners and subscription change listeners and makes sure that no more listeners
can be added.
This method ensures that subscription can be safely garbage-collected when all outside references to it are lost.
close
in interface AutoCloseable
public Set<Class<? extends E>> getEventTypes()
getEventTypes
in interface ObservableSubscription<E>
public boolean containsEventType(Class<?> eventType)
true
if this subscription contains the corresponding event type.containsEventType
in interface ObservableSubscription<E>
getEventTypes()
public void clear()
setSymbols
(Collections.EMPTY_LIST)
public Set<?> getSymbols()
public Set<?> getDecoratedSymbols()
DXFeedSubscription
).
The resulting set cannot be modified. The contents of the resulting set are undefined if the set of symbols is changed after invocation of this method, but the resulting set is safe for concurrent reads from any threads. The resulting set maybe either a snapshot of the set of the subscribed symbols at the time of invocation or a weakly consistent view of the set.
public void setSymbols(Collection<?> symbols)
setSymbols(Object... symbols)
method.
All registered event listeners will receive update on the last events for all
newly added symbols.
ObservableSubscriptionChangeListener
instances of any resulting changes in the set of
subscribed symbols while holding the lock for this subscription.
This implementation decorates symbols via protected decorateSymbol(Object)
method,
that can be overridden in subclasses of DXFeedSubscription
. Installed
ObservableSubscriptionChangeListener
instances receive decorated symbols.
symbols
- the collection of symbols.public void setSymbols(Object... symbols)
setSymbols(Collection<?> symbols)
method.
All registered event listeners will receive update on the last events for all
newly added symbols.
ObservableSubscriptionChangeListener
instances of any resulting changes in the set of
subscribed symbols while holding the lock for this subscription.
This implementation decorates symbols via protected decorateSymbol(Object)
method,
that can be overridden in subclasses of DXFeedSubscription
. Installed
ObservableSubscriptionChangeListener
instances receive decorated symbols.
symbols
- the array of symbols.public void addSymbols(Collection<?> symbols)
addSymbols(Object... symbols)
method.
All registered event listeners will receive update on the last events for all
newly added symbols.
ObservableSubscriptionChangeListener
instances of any resulting changes in the set of
subscribed symbols while holding the lock for this subscription.
This implementation decorates symbols via protected decorateSymbol(Object)
method,
that can be overridden in subclasses of DXFeedSubscription
. Installed
ObservableSubscriptionChangeListener
instances receive decorated symbols.
symbols
- the collection of symbols.public void addSymbols(Object... symbols)
addSymbols(Collection<?> symbols)
method.
All registered event listeners will receive update on the last events for all
newly added symbols.
ObservableSubscriptionChangeListener
instances of any resulting changes in the set of
subscribed symbols while holding the lock for this subscription.
This implementation decorates symbols via protected decorateSymbol(Object)
method
that can be overridden in subclasses of DXFeedSubscription
. Installed
ObservableSubscriptionChangeListener
instances receive decorated symbols.
symbols
- the array of symbols.public void addSymbols(Object symbol)
addSymbols(Collection<?> symbols)
method.
All registered event listeners will receive update on the last events for all
newly added symbols.
ObservableSubscriptionChangeListener
instances of any resulting changes in the set of
subscribed symbols while holding the lock for this subscription.
This implementation decorates symbols via protected decorateSymbol(Object)
method
that can be overridden in subclasses of DXFeedSubscription
. Installed
ObservableSubscriptionChangeListener
instances receive decorated symbols.
symbol
- the symbol.public void removeSymbols(Collection<?> symbols)
removeSymbols(Object... symbols)
method.
ObservableSubscriptionChangeListener
instances of any resulting changes in the set of
subscribed symbols while holding the lock for this subscription.
This implementation decorates symbols via protected decorateSymbol(Object)
method,
that can be overridden in subclasses of DXFeedSubscription
. Installed
ObservableSubscriptionChangeListener
instances receive decorated symbols.
symbols
- the collection of symbols.public void removeSymbols(Object... symbols)
removeSymbols(Collection<?> symbols)
method.
ObservableSubscriptionChangeListener
instances of any resulting changes in the set of
subscribed symbols while holding the lock for this subscription.
This implementation decorates symbols via protected decorateSymbol(Object)
method,
that can be overridden in subclasses of DXFeedSubscription
. Installed
ObservableSubscriptionChangeListener
instances receive decorated symbols.
symbols
- the array of symbols.public Executor getExecutor()
null
if default executor of the attached DXFeed
is used.public void setExecutor(Executor executor)
executor
- executor for processing event notifications on this subscription,
or null
if default executor of the attached DXFeed
is used.public void addEventListener(DXFeedEventListener<E> listener)
symbols
is empty) or not attached
to any feed
(its set of change listeners is empty).
This method does nothing if this subscription is closed.listener
- the event listener.NullPointerException
- if listener is null.IllegalStateException
- if subscription is attached and is not empty.public void removeEventListener(DXFeedEventListener<E> listener)
listener
- the event listener.NullPointerException
- if listener is null.public void addChangeListener(ObservableSubscriptionChangeListener listener)
ObservableSubscriptionChangeListener.symbolsAdded(java.util.Set<?>)
on the given listener while holding the lock for this
subscription. This way the given listener synchronously receives existing subscription state and and
is synchronously notified on all changes in subscription afterwards.
Whenever a symbol in a set of subscribed symbols is replaced by the other symbol that is
equal
to the old one, the decision on whether to notify installed listeners
about the change is based on the result of
shallNotifyOnSymbolUpdate
method.
addChangeListener
in interface ObservableSubscription<E>
listener
- the subscription change listener.NullPointerException
- if listener is null.public void removeChangeListener(ObservableSubscriptionChangeListener listener)
ObservableSubscriptionChangeListener.subscriptionClosed()
on the given listener while
holding the lock for this subscription.removeChangeListener
in interface ObservableSubscription<E>
listener
- the subscription change listener.NullPointerException
- if listener is null.protected Object decorateSymbol(Object symbol)
DXFeedSubscription
client code
before it goes to installed ObservableSubscriptionChangeListener
instances.
This method can be overridden in subclasses. See DXFeedTimeSeriesSubscription
for an example.
This implementation throws NullPointerException
if symbol
is null
or returns symbol
otherwise.symbol
- the symbol to decorate.NullPointerException
- if symbol is null.protected Object undecorateSymbol(Object symbol)
decorateSymbol(Object)
.
This method can be overridden in subclasses. See DXFeedTimeSeriesSubscription
for an example.
This implementation throws NullPointerException
if symbol
is null
or returns symbol
otherwise.symbol
- the symbol to undecorate.NullPointerException
- if symbol is null.protected boolean shallNotifyOnSymbolUpdate(@Nonnull Object symbol, @Nullable Object oldSymbol)
installed
ObservableSubscriptionChangeListener
about the change.
This method returns false
if the new symbol shall be considered the same one and
no notification is needed. The attached listeners
get
symbolsAdded
notification for all
symbols that this method returns true
for.
This implementation compares instances of FilteredSubscriptionSymbol
by their reference
(this implementation returns true
when symbol != oldSymbol
), because their equals and hashCode do
not take input account their filter part. Notification for all other types of symbols (like String
) is
optimized away (this implementation returns true
only when oldSymbol == null
).
symbol
- the recently added symbol to this subscription (not null).oldSymbol
- the previous symbol from the set of symbol based on equal/hashCode search in a hash set or
null
if it did not present before.true
if listeners shall be notified on the change in the set of subscribed symbols.Copyright © 2002–2023 Devexperts LLC. All rights reserved.