Worker State Machine#
Task states#
When the Scheduler asks a Worker to compute a task, it is tracked by the Worker through
a distributed.worker_state_machine.TaskState object - not to be confused with
the matching scheduler-side class distributed.scheduler.TaskState.
The class has a key attribute, TaskState.state, which can assume the following
values:
- released
Known but not actively computing or in memory. A task can stay in this state when the scheduler asked to forget it, but it has dependent tasks on the same worker.
- waiting
The scheduler has added the task to the worker queue. All of its dependencies are in memory somewhere on the cluster, but not all of them are in memory on the current worker, so they need to be fetched.
- fetch
This task is in memory on one or more peer workers, but not on this worker. Its data is queued to be transferred over the network, either because it’s a dependency of a task in
waitingstate, or because the Active Memory Manager requested it to be replicated here. The task can be found in theWorkerState.data_neededheap.- missing
Like
fetch, but all peer workers that were listed by the scheduler are either unreachable or have responded they don’t actually have the task data. The worker will periodically ask the scheduler if it knows of additional replicas; when it does, the task will transition again tofetch. The task can be found in theWorkerState.missing_dep_flightset.- flight
The task data is currently being transferred over the network from another worker. The task can be found in the
WorkerState.in_flight_tasksandWorkerState.in_flight_workerscollections.- ready
The task is ready to be computed; all of its dependencies are in memory on the current worker and it’s waiting for an available thread. The task can be found in the
WorkerState.readyheap.- constrained
Like
ready, but the user specified resource constraints for this task. The task can be found in theWorkerState.constrainedqueue.- executing
The task is currently being computed on a thread. It can be found in the
WorkerState.executingset and in thedistributed.worker.Worker.active_threadsdict.- long-running
Like
executing, but the user code calleddistributed.secede()so the task no longer counts towards the maximum number of concurrent tasks. It can be found in theWorkerState.long_runningset and in thedistributed.worker.Worker.active_threadsdict.- rescheduled
The task just raised the
Rescheduleexception. This is a transitory state, which is not stored permanently.- cancelled
The scheduler asked to forget about this task, but it’s technically impossible at the moment. See Task cancellation. The task can be found in whatever collections it was in its
previousstate.- resumed
The task was recovered from
cancelledstate. See Task cancellation. The task can be found in whatever collections it was in itspreviousstate.- memory
Task execution completed, or the task was successfully transferred from another worker, and is now held in either
WorkerState.dataorWorkerState.actors.- error
Task execution failed. Alternatively, task execution completed successfully, or the task data transferred successfully over the network, but it failed to serialize or deserialize. The full exception and traceback are stored in the task itself, so that they can be re-raised on the client.
- forgotten
The scheduler asked this worker to forget about the task, and there are neither dependents nor dependencies on the same worker. As soon as a task reaches this state, it is immediately dereferenced from the
WorkerStateand will be soon garbage-collected. This is the only case where two instances of aTaskStateobject with the samekeycan (transitorily) exist in the same interpreter at the same time.
Fetching dependencies#
As tasks that need to be computed arrive on the Worker, any dependencies that are not
already in memory on the same worker are wrapped by a TaskState object and
contain a listing of workers (TaskState.who_has) to collect their result from.
These TaskState objects have their state set to fetch, are put in the
data_needed heap, and are progressively transferred over the
network. For each dependency we select a worker at random that has that data and collect
the dependency from that worker. To improve bandwidth, we opportunistically gather other
dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB
of data (transfer_message_bytes_limit, which is acquired from the
configuration key distributed.worker.transfer.message-bytes-limit) - too little data
and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of
50 connections (transfer_incoming_count_limit, which is in turn
acquired from the configuration key distributed.worker.connections.outgoing) so as
to avoid overly-fragmenting our network bandwidth.
In the event that the network comms between two workers are saturated, a dependency task
may cycle between fetch and flight until it is successfully collected. It may
also happen that a peer worker responds that it doesn’t have a replica of the requested
data anymore; finally, the peer worker may be unreachable or unresponsive. When that
happens, the peer is removed from who_has and the task is
transitioned back to fetch, so that the Worker will try gathering the same key from
a different peer. If who_has becomes empty due to this process, the
task transitions to missing and the Worker starts periodically asking the Scheduler
if additional peers are available.
The same system used for fetching dependencies is also used by Active Memory Manager replication.
Note
There is at most one gather_dep() asyncio task running at any
given time for any given peer worker. If all workers holding a replica of a task
in fetch state are already in flight, the task will remain in fetch state
until a worker becomes available again.
Computing tasks#
A TaskState that needs to be computed proceeds on the Worker through the
following pipeline. It has its run_spec defined, which instructs the
worker how to execute it.
After all dependencies for a task are in memory, the task transitions from waiting
to ready or constrained and is added to the ready heap.
As soon as a thread is available, we pop a task from the top of the heap and put the task into a thread from a local thread pool to execute.
Optionally, while it’s running, this task may identify itself as a long-running task
(see Tasks launching tasks), at which point it secedes from the
thread pool and changes state to long-running. executing and long-running are
almost identical states, the only difference being that the latter don’t count towards
the maximum number of tasks running in parallel at the same time.
A task can terminate in three ways:
Complete successfully; its return value is stored in either
dataoractorsRaise an exception; the exception and traceback are stored on the
TaskStateobjectRaise
Reschedule; it is immediately forgotten.
In all cases, the outcome is sent back to the scheduler.
Scattered data#
Scattered data follows an even simpler path,
landing directly in memory:
Forgetting tasks#
Once a task is in memory or error, the Worker will hold onto it indefinitely,
until the Scheduler explicitly asks the Worker to forget it.
This happens when there are no more Clients holding a reference to the key and there are
no more waiter tasks (that is, dependents that have not been computed). Additionally,
the Active Memory Manager may ask to drop excess replicas of a task.
In the case of rescheduled, the task will instead immediately transition to
released and then forgotten without waiting for the scheduler.
Irregular flow#
There are a few important exceptions to the flow diagrams above:
A task is stolen, in which case it transitions from
waiting,ready, orconstraineddirectly toreleased. Note that steal requests for tasks that are currently executing are rejected.Scheduler intercession, in which the scheduler reassigns a task that was previously assigned to a separate worker to a new worker. This most commonly occurs when a worker dies during computation.
Client intercession, where a client either explicitly releases a Future or descopes it; alternatively the whole client may shut down or become unresponsive. When there are no more clients holding references to a key or one of its dependents, the Scheduler will release it.
In short:
Important
A task can transition to released from any state, not just those in the
diagrams above.
If there are no dependants, the task immediately transitions to forgotten and is
descoped. However, there is an important exception, Task cancellation.
Task cancellation#
The Worker may receive a request to release a key while it is currently in flight,
executing, or long-running. Due to technical limitations around cancelling
Python threads, and the way data fetching from peer workers is currently implemented,
such an event cannot cause the related asyncio task (and, in the case of executing /
long-running, the thread running the user code) to be immediately aborted. Instead,
tasks in these three states are instead transitioned to another state, cancelled,
which means that the asyncio task will proceed to completion (outcome is irrelevant) and
then* the Dask task will be released.
The cancelled state has a substate, previous, which is set to one
of the above three states. The common notation for this <state>(<previous>),
e.g. cancelled(flight).
While a task is cancelled, one of three things will happen:
Nothing happens before the asyncio task completes; e.g. the Scheduler does not change its mind and still wants the Worker to forget about the task until the very end. When that happens, the task transitions from
cancelledtoreleasedand, typically,forgotten.The scheduler switches back to its original request:
The scheduler asks the Worker to fetch a task that is currently
cancelled(flight); at which point the task will immediately revert toflight, forget that cancellation ever happened, and continue waiting on the data fetch that’s already running;The scheduler asks the Worker to compute a task that is currently
cancelled(executing)orcancelled(long-running). The Worker will completely disregard the newrun_spec(if it changed), switch back to thepreviousstate, and wait for the already executing thread to finish.
The scheduler flips to the opposite request, from fetch to computation or the other way around.
To serve this last use case there is another special state, resumed. A task can
enter resumed state exclusively from cancelled. resumed retains the
previous attribute from the cancelled state and adds another
attribute, next, which is always:
To recap, these are all possible permutations of states and substates to handle cancelled tasks:
state |
previous |
next |
|---|---|---|
cancelled |
flight |
None |
cancelled |
executing |
None |
cancelled |
long-running |
None |
resumed |
flight |
waiting |
resumed |
executing |
fetch |
resumed |
long-running |
fetch |
If a resumed task completes successfully, it will transition to memory (as
opposed to a cancelled task, where the output is disregarded) and the Scheduler
will be informed with a spoofed termination message, that is the expected end message
for flight if the task is resumed(executing->fetch) or
resumed(long-running->fetch), and the expected end message for execute if
the task is resumed(flight->waiting).
If the task fails or raises Reschedule, the Worker will instead
silently ignore the exception and switch to its intended course, so
resumed(executing->fetch) or resumed(long-running->fetch) will transition to
fetch and resumed(flight->waiting) will transition to waiting.
Finally, the scheduler can change its mind multiple times over the lifetime of the task,
so a resumed(executing->fetch) or resumed(long-running->fetch) task may be
requested to transition to waiting again, at which point it will just revert to its
previous state and forget the whole incident; likewise a
resumed(flight->waiting) task could be requested to transition to fetch again,
so it will just transition to flight instead.
A common real-life use case
There are at least two workers on the cluster, A and B.
Task x is computed successfully on worker A.
When task x transitions to memory on worker A, the scheduler asks worker B to compute task y, which depends on task x.
B starts acquiring the key x from A, which sends the task into
flightmode.Worker A crashes, and for whatever reason the scheduler notices before worker B does.
The scheduler will release task y (because it’s waiting on dependencies that are nowhere to be found in memory anymore) and reschedule task x somewhere else on the cluster. Task x will transition to
cancelled(flight)on worker A.If the scheduler randomly chooses worker A to compute task X, the task will transition to
resumed(flight->waiting).When, and only when, the TCP socket from A to B collapses (e.g. due to timeout), the task will transition to
waitingand will be eventually recomputed on A.
Important
You always have at most one compute() or
gather_dep() asyncio task running for any one given key; you
never have both.
Task state mapping between Scheduler and Worker#
The task states on the scheduler and the worker are different, and their mapping is somewhat nuanced:
Scheduler states |
Typical worker states |
Edge case worker states |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
In addition to the above states, a worker may not know about a specific task at all. The opposite, where the worker knows about a task but it is nowhere to be found on the scheduler, happens exclusively in the case of Task cancellation.
There are also race conditions to be considered, where a worker (or some workers) know something before the scheduler does, or the other way around. For example,
A task will always transition from
executingtomemoryon the worker before it can transition fromprocessingtomemoryon the schedulerA task will always transition to
releasedorforgottenon the scheduler first, and only when the message reaches the worker it will be released there too.
Flow control#
There are several classes involved in the worker state machine:
TaskState includes all the information related to a single task; it also
includes references to dependent and dependency tasks. This is just a data holder, with
no mutating methods. Note that this is a distinct class from
distributed.scheduler.TaskState.
WorkerState encapsulates the state of the worker as a whole. It holds
references to TaskState in its tasks dictionary and in
several other secondary collections. Crucially, this class has no knowledge or
visibility whatsoever on asyncio, networking, disk I/O, threads, etc.
Note that this is a distinct class from distributed.scheduler.WorkerState.
WorkerState offers a single method to mutate the state:
handle_stimulus(). The state must not be altered in any other way.
The method acquires a StateMachineEvent, a.k.a. stimulus, which is a data
class which determines that something happened which may cause the worker state to
mutate. A stimulus can arrive from either the scheduler (e.g. a request to compute a
task) or from the worker itself (e.g. a task has finished computing).
WorkerState.handle_stimulus() alters the internal state (e.g., it could transition
a task from executing to memory) and returns a list of Instruction
objects, which are actions that the worker needs to take but are external to the state
itself:
send a message to the scheduler
compute a task
gather a task from a peer worker
WorkerState.handle_stimulus() is wrapped by BaseWorker.handle_stimulus(),
which consumes the Instruction objects. BaseWorker deals with asyncio
task creation, tracking, and cleanup, but does not actually implement the actual task
execution or gather; instead it exposes abstract async methods
execute() and gather_dep(), which are then
overridden by its subclass Worker, which actually runs tasks and
performs network I/O. When the implemented methods finish, they must return a
StateMachineEvent, which is fed back into BaseWorker.handle_stimulus().
Note
This can create a (potentially very long) chain of events internal to the worker;
e.g. if there are more tasks in the ready queue than there are
threads, then the termination StateMachineEvent of one task will trigger the
Instruction to execute the next one.
To summarize:
WorkerStateis agnostic to asyncio, networking, threading, and disk I/O; it includes collections ofTaskStateobjects.BaseWorkerencapsulatesWorkerStateand adds awareness of asyncioWorkersubclassesBaseWorkerand adds awereness of networking, threading, and disk I/O.
Internal state permutation#
Internally, WorkerState.handle_stimulus() works very similarly to
the same process on the scheduler side:
WorkerState.handle_stimulus()callsWorkerState._handle_<stimulus name>(),which returns a tuple of
recommendations to transition tasks: {
TaskState: <new state>}list of
Instructionobjects
WorkerState.handle_stimulus()then passes the recommendations toWorkerState._transitions()For each recommendation,
WorkerState._transitions()callsWorkerState._transition(),which in turn calls
WorkerState._transition_<start state>_<end state>(),which in turn returns an additional tuple of (recommendations, instructions)
the new recommendations are consumed by
WorkerState._transitions(), until no more recommendations are returned.WorkerState.handle_stimulus()finally returns the list of instructions, which has been progressively extended by the transitions.
API Documentation#
- class distributed.worker_state_machine.TaskState(key: Key, run_id: int = -1, run_spec: T_runspec | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: Literal['executing', 'long-running', 'flight', None] = None, next: Literal['fetch', 'waiting', None] = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, span_id: str | None = None, done: bool = False)[source]#
Holds volatile state relating to an individual Dask task.
Not to be confused with
distributed.scheduler.TaskState, which holds similar information on the scheduler side.- done: bool#
True if the
execute()orgather_dep()coroutine servicing this task completed; False otherwise. This flag changes the behaviour of transitions out of theexecuting,flightetc. states.
- key: Key#
Task key. Mandatory.
- metadata: dict#
Metadata related to the task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict).
- next: Literal['fetch', 'waiting', None]#
The next state of the task. It is not None iff
state== resumed.
- previous: Literal['executing', 'long-running', 'flight', None]#
The previous state of the task. It is not None iff
statein (cancelled, resumed).
- priority: tuple[int, ...] | None#
The priority this task given by the scheduler. Determines run order.
- run_spec: T_runspec | None#
A tuple containing the
function,args,kwargsandtaskassociated with this TaskState instance. This defaults toNoneand can remain empty if it is a dependency that this worker will receive from another worker.
- span_id: str | None#
unique span id (see
distributed.spans). Matchesdistributed.scheduler.TaskState.group.span_id.
- state: TaskStateState#
The current state of the task
- class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[Key, object] | None = None, threads: dict[Key, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: float = inf, transfer_message_bytes_limit: float = inf)[source]#
State machine encapsulating the lifetime of all tasks on a worker.
Not to be confused with
distributed.scheduler.WorkerState.Note
The data attributes of this class are implementation details and may be changed without a deprecation cycle.
Warning
The attributes of this class are all heavily correlated with each other. Do not modify them directly, ever, as it is extremely easy to obtain a broken state this way, which in turn will likely result in cluster-wide deadlocks.
The state should be exclusively mutated through
handle_stimulus().- address: str#
Worker <IP address>:<port>. This is used in decision-making by the state machine, e.g. to determine if a peer worker is running on the same host or not. This attribute may not be known when the WorkerState is initialised. It must be set before the first call to
handle_stimulus().
- property all_running_tasks: set[TaskState]#
All tasks that are currently occupying a thread. They may or may not count towards the maximum number of threads.
These are:
ts.status in (executing, long-running)
ts.status in (cancelled, resumed) and ts.previous in (executing, long-running)
See also
- available_resources: dict[str, float]#
{resource name: amount}. Current resources that aren’t being currently consumed by task execution. Always less or equal tototal_resources. See Worker Resources.
- busy_workers: set[str]#
Peer workers that recently returned a busy status. Workers in this set won’t be asked for additional dependencies for some time.
- constrained: HeapSet[TaskState]#
Priority heap of tasks that are ready to run, but are waiting on abstract resources like GPUs. Mutually exclusive with
ready. Seeavailable_resourcesand Worker Resources.
- data: MutableMapping[Key, object]#
In-memory tasks data. This collection is shared by reference between
Worker,WorkerMemoryManager, and this class.
- data_needed: defaultdict[str, HeapSet[TaskState]]#
The tasks which still require data in order to execute and are in memory on at least another worker, prioritized as per-worker heaps. All and only tasks with
TaskState.state == 'fetch'are in this collection. ATaskStatewith multiple entries inwho_haswill appear multiple times here.
- executed_count: int#
A number of tasks that this worker has run in its lifetime; this includes failed and cancelled tasks. See also
executing_count().
- executing: set[TaskState]#
Set of tasks that are currently running.
This set includes exclusively tasks with
state== ‘executing’ as well as tasks withstatein (‘cancelled’, ‘resumed’) andprevious== ‘executing`.See also
executing_count()andlong_running.
- property executing_count: int#
Count of tasks currently executing on this worker and counting towards the maximum number of threads.
It includes cancelled tasks, but does not include long running (a.k.a. seceded) tasks.
- fetch_count: int#
Total number of tasks in fetch state. If a task is in more than one data_needed heap, it’s only counted once.
- generation: int#
Counter that decreases every time the compute-task handler is invoked by the Scheduler. It is appended to
TaskState.priorityand acts as a tie-breaker between tasks that have the same priority on the Scheduler, determining a last-in-first-out order between them.
- handle_stimulus(*stims: StateMachineEvent) list[Instruction][source]#
Process one or more external events, transition relevant tasks to new states, and return a list of instructions to be executed as a consequence.
See also
- has_what: defaultdict[str, set[Key]]#
{worker address: {ts.key, ...}. The data that we care about that we think a worker has
- in_flight_tasks: set[TaskState]#
Tasks that are coming to us in current peer-to-peer connections.
This set includes exclusively tasks with
state== ‘flight’ as well as tasks withstatein (‘cancelled’, ‘resumed’) andprevious== ‘flight`.See also
in_flight_tasks_count().
- property in_flight_tasks_count: int#
Number of tasks currently being replicated from other workers to this one.
See also
- in_flight_workers: dict[str, set[Key]]#
{worker address: {ts.key, ...}}The workers from which we are currently gathering data and the dependencies we expect from those connections. Workers in this dict won’t be asked for additional dependencies until the current query returns.
- log: deque[tuple]#
Transition log:
[(..., stimulus_id: str | None, timestamp: float), ...]The number of stimuli logged is capped. See alsostory()andstimulus_log.
- long_running: set[TaskState]#
Set of tasks that are currently running and have called
secede(), so they no longer count towards the maximum number of concurrent tasks (nthreads). These tasks do not appear in theexecutingset.This set includes exclusively tasks with
state== ‘long-running’ as well as tasks withstatein (‘cancelled’, ‘resumed’) andprevious== ‘long-running`.
- nthreads: int#
Number of tasks that can be executing in parallel. At any given time,
executing_count()<= nthreads.
- plugins: dict[str, WorkerPlugin]#
{name: worker plugin}. This collection is shared by reference betweenWorkerand this class. The Worker managed adding and removing plugins, while the WorkerState invokes theWorkerPlugin.transitionmethod, is available.
- ready: HeapSet[TaskState]#
Priority heap of tasks that are ready to run and have no resource constrains. Mutually exclusive with
constrained.
- rng: random.Random#
Statically-seeded random state, used to guarantee determinism whenever a pseudo-random choice is required
- running: bool#
True if the state machine should start executing more tasks and fetch dependencies whenever a slot is available. This property must be kept aligned with the Worker:
WorkerState.running == (Worker.status is Status.running).
- stimulus_log: deque[StateMachineEvent]#
Log of all stimuli received by
handle_stimulus(). The number of events logged is capped. See alsologandstimulus_story().
- stimulus_story(*keys_or_tasks: str | int | float | tuple[str | int | float | tuple[Key, ...], ...] | TaskState) list[StateMachineEvent][source]#
Return all state machine events involving one or more tasks
- story(*keys_or_tasks_or_stimuli: str | int | float | tuple[str | int | float | tuple[Key, ...], ...] | TaskState) list[tuple][source]#
Return all records from the transitions log involving one or more tasks or stimulus_id’s
- task_counter: TaskCounter#
Current number of tasks and cumulative elapsed time in each state, both broken down by
prefix
- tasks: dict[Key, TaskState]#
{key: TaskState}. The tasks currently executing on this worker (and any dependencies of those tasks)
- threads: dict[Key, int]#
{ts.key: thread ID}. This collection is shared by reference betweenWorkerand this class. While the WorkerState is thread-agnostic, it still needs access to this information in some cases. This collection is populated bydistributed.worker.Worker.execute(). It does not need to be populated for the WorkerState to work.
- total_resources: dict[str, float]#
{resource name: amount}. Total resources available for task execution. See :doc: resources.
- transfer_incoming_bytes_limit: float#
Limit of bytes for incoming data transfers; this is used for throttling.
- transfer_incoming_bytes_throttle_threshold: int#
Ignore
transfer_incoming_count_limitas long astransfer_incoming_bytesis less than this value.
- property transfer_incoming_count: int#
Current number of open data transfers from other workers.
See also
- transfer_incoming_count_limit: int#
Maximum number of concurrent incoming data transfers from other workers. See also
distributed.worker.Worker.transfer_outgoing_count_limit.
- transfer_incoming_count_total: int#
Total number of data transfers from other workers since the worker was started.
- transfer_message_bytes_limit: float#
Number of bytes to gather from the same worker in a single call to
BaseWorker.gather_dep(). Multiple small tasks that can be gathered from the same worker will be batched in a single instruction as long as their combined size doesn’t exceed this value. If the first task to be gathered exceeds this limit, it will still be gathered to ensure progress. Hence, this limit is not absolute.
- transition_counter: int#
Total number of state transitions so far. See also
logandtransition_counter_max.
- transition_counter_max: int | Literal[False]#
Raise an error if the
transition_counterever reaches this value. This is meant for debugging only, to catch infinite recursion loops. In production, it should always be set to False.
- class distributed.worker_state_machine.BaseWorker(state: WorkerState)[source]#
Wrapper around the
WorkerStatethat implements instructions handling. This is an abstract class with several@abc.abstractmethodmethods, to be subclassed byWorkerand by unit test mock-ups.- abstractmethod batched_send(msg: dict[str, Any]) None[source]#
Send a fire-and-forget message to the scheduler through bulk comms.
- Parameters:
- msg: dict
msgpack-serializable message to send to the scheduler. Must have a ‘op’ key which is registered in Scheduler.stream_handlers.
- abstractmethod digest_metric(name: Hashable, value: float) None[source]#
Log an arbitrary numerical metric
- abstractmethod async execute(key: str | int | float | tuple[Key, ...], *, stimulus_id: str) StateMachineEvent[source]#
Execute a task
- abstractmethod async gather_dep(worker: str, to_gather: Collection[str | int | float | tuple[str | int | float | tuple[Key, ...], ...]], total_nbytes: int, *, stimulus_id: str) StateMachineEvent[source]#
Gather dependencies for a task from a worker who has them
- Parameters:
- workerstr
Address of worker to gather dependencies from
- to_gatherlist
Keys of dependencies to gather from worker – this is not necessarily equivalent to the full list of dependencies of
depas some dependencies may already be present on this worker.- total_nbytesint
Total number of bytes for all the dependencies in to_gather combined
- handle_stimulus(*stims: StateMachineEvent) None[source]#
Forward one or more external stimuli to
WorkerState.handle_stimulus()and process the returned instructions, invoking the relevant Worker callbacks (@abc.abstractmethodmethods below).Spawn asyncio tasks for all asynchronous instructions and start tracking them.
See also
- abstractmethod async retry_busy_worker_later(worker: str) StateMachineEvent[source]#
Wait some time, then take a peer worker out of busy state
- class distributed.worker_state_machine.StateMachineEvent(*args: Any, **kwargs: Any)[source]#
Base abstract class for all stimuli that can modify the worker state
- static from_dict(d: dict) StateMachineEvent[source]#
Convert the output of
recursive_to_dictback into the original object. The output object is meaningful for the purpose of rebuilding the state machine, but not necessarily identical to the original.
- to_loggable(*, handled: float) StateMachineEvent[source]#
Produce a variant version of self that is small enough to be stored in memory in the medium term and contains meaningful information for debugging
- class distributed.worker_state_machine.Instruction(stimulus_id: str)[source]#
Command from the worker state machine to the Worker, in response to an event
- classmethod match(**kwargs: Any) _InstructionMatch[source]#
Generate a partial match to compare against an Instruction instance. The typical usage is to compare a list of instructions returned by
WorkerState.handle_stimulus()or inWorkerState.stimulus_logvs. an expected list of matches.Examples
instructions = ws.handle_stimulus(...) assert instructions == [ TaskFinishedMsg.match(key="x"), ... ]
Note
StateMachineEvent and Instruction are abstract classes, with many
subclasses which are not listed here for the sake of brevity.
Refer to the implementation module distributed.worker_state_machine for the
full list.