Skip to content

base

deepdrivewe.resamplers.base

Resampling algorithms for the weighted ensemble.

Resampler

Bases: ABC

Resampler for the weighted ensemble.

Source code in deepdrivewe/resamplers/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
class Resampler(ABC):
    """Resampler for the weighted ensemble."""

    def __init__(self) -> None:
        """Initialize the resampler."""
        self._index_counter = itertools.count()

    def run(
        self,
        cur_sims: list[SimMetadata],
        binner: Binner,
        recycler: Recycler,
    ) -> tuple[list[SimMetadata], list[SimMetadata], IterationMetadata]:
        """Assign simulations to bins and resample the weighted ensemble.

        Parameters
        ----------
        cur_sims : list[SimMetadata]
            The list of current simulations.
        binner : Binner
            The binner to use for binning the simulations.
        recycler : Recycler
            The recycler to use for recycling the simulations.

        Returns
        -------
        tuple[list[SimMetadata], list[SimMetadata], IterationMetadata]
            The current sims, the new sims, and the iteration metadata.
        """
        # Get the next iteration of simulation metadata
        next_sims = self._get_next_sims(cur_sims)

        # Recycle the current iteration
        cur_sims, next_sims = recycler.recycle_simulations(cur_sims, next_sims)

        # Assign the simulations to bins
        bin_assignments = binner.bin_simulations(next_sims)

        # Compute the iteration metadata
        metadata = binner.compute_iteration_metadata(cur_sims)

        # Resample the simulations in each bin
        new_sims = []
        for bin_sims in bin_assignments.values():
            # Get the simulations in the bin
            binned_sims = [next_sims[sim_idx] for sim_idx in bin_sims]

            # Resample the bin and add them to the new simulations
            cur_sims, resampled_sims = self.resample(cur_sims, binned_sims)

            # Add the resampled simulations to the new simulations
            new_sims.extend(resampled_sims)

        return cur_sims, new_sims, metadata

    def _get_next_sims(self, cur_sims: list[SimMetadata]) -> list[SimMetadata]:
        """Return the simulations for the next iteration."""
        # Create a list to store the new simulations for this iteration
        simulations = []

        for idx, sim in enumerate(cur_sims):
            # Ensure that the simulation has a restart file, i.e., the `sim`
            # object represents a simulation that has been run.
            assert sim.restart_file is not None

            # Create the metadata for the new simulation
            new_sim = SimMetadata(
                weight=sim.weight,
                simulation_id=idx,
                iteration_id=sim.iteration_id + 1,
                parent_restart_file=sim.restart_file,
                # The parent progress coordinate is the progress coordinate
                # of the last frame of the previous simulation
                parent_pcoord=sim.pcoord[-1],
                parent_simulation_id=sim.simulation_id,
                wtg_parent_ids=[sim.simulation_id],
            )

            # Add the new simulation to the current iteration
            simulations.append(new_sim)

        return simulations

    def _add_new_simulation(
        self,
        sim: SimMetadata,
        weight: float,
        wtg_parent_ids: list[int],
    ) -> SimMetadata:
        """Add a new simulation to the current iteration."""
        # Create the metadata for the new simulation
        return SimMetadata(
            weight=weight,
            simulation_id=next(self._index_counter),
            iteration_id=sim.iteration_id,
            restart_file=sim.restart_file,
            parent_restart_file=sim.parent_restart_file,
            parent_pcoord=sim.parent_pcoord,
            parent_simulation_id=sim.parent_simulation_id,
            wtg_parent_ids=wtg_parent_ids,
        )

    def split_sims(
        self,
        sims: list[SimMetadata],
        indices: list[int],
        n_splits: int | list[int] = 2,
    ) -> list[SimMetadata]:
        """Split the simulation index into `n_split`."""
        # Get the simulations to split
        sims_to_split = [sims[idx] for idx in indices]

        # Handle the case where `n_split` is a single integer
        if isinstance(n_splits, int):
            n_splits = [n_splits] * len(sims_to_split)

        # Create a list to store the new simulations
        new_sims: list[SimMetadata] = []

        # Add back the simulations that will not be split
        new_sims.extend(sims[i] for i in range(len(sims)) if i not in indices)

        # Split the simulations using the specified number of splits
        # and equal weights for the split simulations
        for sim, n_split in zip(sims_to_split, n_splits, strict=True):
            for _ in range(n_split):
                # NOTE: The split simulation is assigned a weight equal to
                # the original weight divided by the number of splits. It
                # also inherits the previous wtg_parent_ids.
                new_sim = self._add_new_simulation(
                    sim,
                    sim.weight / n_split,
                    sim.wtg_parent_ids,
                )
                new_sims.append(new_sim)

        return new_sims

    def merge_sims(
        self,
        cur_sims: list[SimMetadata],
        next_sims: list[SimMetadata],
        indices: list[int],
    ) -> list[SimMetadata]:
        """Merge each group of simulation indices into a single simulation.

        NOTE: This method modifies cur sims in place to set the endpoint type.

        Parameters
        ----------
        cur_sims : list[SimMetadata]
            The list of current simulations.
        next_sims : list[SimMetadata]
            The list of next simulations in a particular bin to merge.
        indices : list[int]
            The indices of the next simulations to merge.

        Returns
        -------
        list[SimMetadata]
            The list of new simulations after merging.
        """
        # Get the simulations to merge
        to_merge = [next_sims[idx] for idx in indices]

        # Get the weights of each simulation to merge
        weights = [sim.weight for sim in to_merge]

        # Make sure the weights are normalized to sum to 1 for randomizing.
        # Since the entire ensemble should have a total weight of 1
        # any subset of the ensemble will have a total weight less than 1.
        norm_weights = np.array(weights) / sum(weights)

        # Randomly select one of the simulations with probability equal
        # to the normalized weights
        select: int = np.random.choice(len(to_merge), p=norm_weights)

        # Compute the union of all the wtg_parent_ids
        all_wtg_parent_ids = [set(sim.wtg_parent_ids) for sim in to_merge]
        wtg_parent_ids = list(set.union(*all_wtg_parent_ids))

        # Add the new simulation to the current iteration
        new_sim = self._add_new_simulation(
            to_merge[select],
            sum(weights),
            wtg_parent_ids,
        )

        # Create a list to store the new simulations
        new_sims: list[SimMetadata] = []

        # Get the indices of non-merged simulations
        no_merge_idxs = [i for i in range(len(next_sims)) if i not in indices]

        # Add back the simulations that will not be merged
        new_sims.extend(next_sims[i] for i in no_merge_idxs)

        # Add the new simulation to the list of new simulations
        new_sims.append(new_sim)

        # Get the parent simulation IDs of all the merged simulations
        merged_parents = {x.parent_simulation_id for x in to_merge}
        # Remove the parent simulation id of the new merged simulation
        merged_parents.remove(new_sim.parent_simulation_id)

        # Set the endpoint type for the merged simulations (except the new sim)
        for sim in cur_sims:
            # sim.simulation_id >= 0 ensures that the simulation has not
            # been recycled (i.e., it is not a negative index)
            if sim.simulation_id >= 0 and sim.simulation_id in merged_parents:
                # Set the endpoint type to 2 if the simulation is merged
                sim.endpoint_type = 2

        # Return the new simulation
        return new_sims

    def split_by_weight(
        self,
        sims: list[SimMetadata],
        ideal_weight: float,
    ) -> list[SimMetadata]:
        """Split overweight sims.

        Parameters
        ----------
        sims : list[SimMetadata]
            The list of simulations in a particular bin to split.
        ideal_weight : float
            The ideal weight for each simulation, defined as the total (sum)
            weight of bin divided by the desired number of walkers in the bin.
            This is roughly equivalent to the average weight of the simulations
            in the bin.

        Returns
        -------
        list[SimMetadata]
            The list of new simulations after splitting.
        """
        # Get the weights of the simulations
        weights = np.array([sim.weight for sim in sims])

        # Get the simulation indices
        indices = np.arange(len(sims))

        # Find the walkers that need to be split
        split_inds = indices[weights > ideal_weight].tolist()

        # Calculate the number of splits for each walker
        num_splits = [math.ceil(weights[i] / ideal_weight) for i in split_inds]

        # Split the simulations
        return self.split_sims(sims, split_inds, num_splits)

    def merge_by_weight(
        self,
        cur_sims: list[SimMetadata],
        next_sims: list[SimMetadata],
        ideal_weight: float,
    ) -> list[SimMetadata]:
        """Merge underweight sims.

        Parameters
        ----------
        cur_sims : list[SimMetadata]
            The list of current simulations.
        next_sims : list[SimMetadata]
            The list of simulations in a particular bin to merge.
        ideal_weight : float
            The ideal weight for each simulation, defined as the total (sum)
            weight of bin divided by the desired number of walkers in the bin.
            This is roughly equivalent to the average weight of the simulations
            in the bin.

        Returns
        -------
        list[SimMetadata]
            The list of simulations after merging.
        """
        while True:
            # Sort the simulations by weight
            sorted_sims = sorted(next_sims, key=lambda sim: sim.weight)

            # Get the weights of the sorted simulations
            weights = np.array([sim.weight for sim in sorted_sims])

            # Accumulate the weights
            cumul_weight = np.add.accumulate(weights)

            # Get the simulation indices
            indices = np.arange(len(next_sims))

            # Find the walkers that need to be merged
            to_merge = indices[cumul_weight <= ideal_weight].tolist()

            # Break the loop if no walkers need to be merged
            if len(to_merge) < 2:  # noqa: PLR2004
                return next_sims

            # Merge the simulations
            next_sims = self.merge_sims(cur_sims, sorted_sims, to_merge)

    def adjust_count(
        self,
        cur_sims: list[SimMetadata],
        next_sims: list[SimMetadata],
        target_count: int,
    ) -> list[SimMetadata]:
        """Adjust the number of sims to match the target count.

        Parameters
        ----------
        cur_sims : list[SimMetadata]
            The list of current simulations.
        next_sims : list[SimMetadata]
            The list of simulations in a particular bin to adjust.
        target_count : int
            The number of simulations to have in the bin.

        Returns
        -------
        list[SimMetadata]
            The list of simulations after adjusting.
        """
        # Case 1: Too few sims
        while len(next_sims) < target_count:
            # Get the index of the largest weight simulation
            index = int(np.argmax([sim.weight for sim in next_sims]))

            # Split the highest weight sim in two
            next_sims = self.split_sims(next_sims, [index], 2)

            # Break the loop if the target count is reached
            if len(next_sims) == target_count:
                break

        # Case 2: Too many sims
        while len(next_sims) > target_count:
            # Sort the simulation indices by weight
            sorted_indices = np.argsort([sim.weight for sim in next_sims])

            # Get the two lowest weight indices to merge
            indices = sorted_indices[:2].tolist()

            # Merge the two lowest weight sims
            next_sims = self.merge_sims(cur_sims, next_sims, indices)

            # Break the loop if the target count is reached
            if len(next_sims) == target_count:
                break

        return next_sims

    def split_by_threshold(
        self,
        sims: list[SimMetadata],
        max_allowed_weight: float,
    ) -> list[SimMetadata]:
        """Split the sims by threshold.

        Parameters
        ----------
        sims : list[SimMetadata]
            The list of simulations to split.
        max_allowed_weight : float
            The maximum allowed weight for each simulation. If the weight of a
            simulation exceeds this value, it will be split.

        Returns
        -------
        list[SimMetadata]
            The list of simulations after splitting.
        """
        return self.split_by_weight(sims, max_allowed_weight)

    def merge_by_threshold(
        self,
        cur_sims: list[SimMetadata],
        next_sims: list[SimMetadata],
        min_allowed_weight: float,
    ) -> list[SimMetadata]:
        """Merge all simulations under a given threshold into a single sim.

        Parameters
        ----------
        cur_sims : list[SimMetadata]
            The list of current simulations.
        next_sims : list[SimMetadata]
            The list of simulations in a particular bin to merge.
        min_allowed_weight : float
            The minimum allowed weight for each simulation. All the simulations
            with a weight less than this value will be merged into a single
            simulation walker.

        Returns
        -------
        list[SimMetadata]
            The list of simulations after merging.
        """
        while True:
            # Sort the simulations by weight
            sorted_sims = sorted(next_sims, key=lambda sim: sim.weight)

            # Get the weights of the sorted simulations
            weights = np.array([sim.weight for sim in sorted_sims])

            # Get the simulation indices
            indices = np.arange(len(next_sims))

            # Find the walkers that need to be merged
            to_merge = indices[weights < min_allowed_weight].tolist()
            if len(to_merge) < 2:  # noqa: PLR2004
                return next_sims

            # Merge the simulations
            next_sims = self.merge_sims(cur_sims, sorted_sims, to_merge)

    def get_pcoords(
        self,
        next_sims: list[SimMetadata],
        pcoord_idx: int = 0,
    ) -> list[float]:
        """Extract the progress coordinates from the simulations.

        Parameters
        ----------
        next_sims : list[SimMetadata]
            The list of simulation metadata.
        pcoord_idx : int
            The index of the progress coordinate to extract. Default is 0.

        Returns
        -------
        list[float]
            The progress coordinates for the simulations.
        """
        return [sim.parent_pcoord[pcoord_idx] for sim in next_sims]

    @abstractmethod
    def resample(
        self,
        cur_sims: list[SimMetadata],
        next_sims: list[SimMetadata],
    ) -> tuple[list[SimMetadata], list[SimMetadata]]:
        """Resample the weighted ensemble."""
        ...

__init__

__init__() -> None

Initialize the resampler.

Source code in deepdrivewe/resamplers/base.py
def __init__(self) -> None:
    """Initialize the resampler."""
    self._index_counter = itertools.count()

run

run(
    cur_sims: list[SimMetadata],
    binner: Binner,
    recycler: Recycler,
) -> tuple[
    list[SimMetadata], list[SimMetadata], IterationMetadata
]

Assign simulations to bins and resample the weighted ensemble.

Parameters:

Name Type Description Default
cur_sims list[SimMetadata]

The list of current simulations.

required
binner Binner

The binner to use for binning the simulations.

required
recycler Recycler

The recycler to use for recycling the simulations.

required

Returns:

Type Description
tuple[list[SimMetadata], list[SimMetadata], IterationMetadata]

The current sims, the new sims, and the iteration metadata.

Source code in deepdrivewe/resamplers/base.py
def run(
    self,
    cur_sims: list[SimMetadata],
    binner: Binner,
    recycler: Recycler,
) -> tuple[list[SimMetadata], list[SimMetadata], IterationMetadata]:
    """Assign simulations to bins and resample the weighted ensemble.

    Parameters
    ----------
    cur_sims : list[SimMetadata]
        The list of current simulations.
    binner : Binner
        The binner to use for binning the simulations.
    recycler : Recycler
        The recycler to use for recycling the simulations.

    Returns
    -------
    tuple[list[SimMetadata], list[SimMetadata], IterationMetadata]
        The current sims, the new sims, and the iteration metadata.
    """
    # Get the next iteration of simulation metadata
    next_sims = self._get_next_sims(cur_sims)

    # Recycle the current iteration
    cur_sims, next_sims = recycler.recycle_simulations(cur_sims, next_sims)

    # Assign the simulations to bins
    bin_assignments = binner.bin_simulations(next_sims)

    # Compute the iteration metadata
    metadata = binner.compute_iteration_metadata(cur_sims)

    # Resample the simulations in each bin
    new_sims = []
    for bin_sims in bin_assignments.values():
        # Get the simulations in the bin
        binned_sims = [next_sims[sim_idx] for sim_idx in bin_sims]

        # Resample the bin and add them to the new simulations
        cur_sims, resampled_sims = self.resample(cur_sims, binned_sims)

        # Add the resampled simulations to the new simulations
        new_sims.extend(resampled_sims)

    return cur_sims, new_sims, metadata

split_sims

split_sims(
    sims: list[SimMetadata],
    indices: list[int],
    n_splits: int | list[int] = 2,
) -> list[SimMetadata]

Split the simulation index into n_split.

Source code in deepdrivewe/resamplers/base.py
def split_sims(
    self,
    sims: list[SimMetadata],
    indices: list[int],
    n_splits: int | list[int] = 2,
) -> list[SimMetadata]:
    """Split the simulation index into `n_split`."""
    # Get the simulations to split
    sims_to_split = [sims[idx] for idx in indices]

    # Handle the case where `n_split` is a single integer
    if isinstance(n_splits, int):
        n_splits = [n_splits] * len(sims_to_split)

    # Create a list to store the new simulations
    new_sims: list[SimMetadata] = []

    # Add back the simulations that will not be split
    new_sims.extend(sims[i] for i in range(len(sims)) if i not in indices)

    # Split the simulations using the specified number of splits
    # and equal weights for the split simulations
    for sim, n_split in zip(sims_to_split, n_splits, strict=True):
        for _ in range(n_split):
            # NOTE: The split simulation is assigned a weight equal to
            # the original weight divided by the number of splits. It
            # also inherits the previous wtg_parent_ids.
            new_sim = self._add_new_simulation(
                sim,
                sim.weight / n_split,
                sim.wtg_parent_ids,
            )
            new_sims.append(new_sim)

    return new_sims

merge_sims

merge_sims(
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    indices: list[int],
) -> list[SimMetadata]

Merge each group of simulation indices into a single simulation.

NOTE: This method modifies cur sims in place to set the endpoint type.

Parameters:

Name Type Description Default
cur_sims list[SimMetadata]

The list of current simulations.

required
next_sims list[SimMetadata]

The list of next simulations in a particular bin to merge.

required
indices list[int]

The indices of the next simulations to merge.

required

Returns:

Type Description
list[SimMetadata]

The list of new simulations after merging.

Source code in deepdrivewe/resamplers/base.py
def merge_sims(
    self,
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    indices: list[int],
) -> list[SimMetadata]:
    """Merge each group of simulation indices into a single simulation.

    NOTE: This method modifies cur sims in place to set the endpoint type.

    Parameters
    ----------
    cur_sims : list[SimMetadata]
        The list of current simulations.
    next_sims : list[SimMetadata]
        The list of next simulations in a particular bin to merge.
    indices : list[int]
        The indices of the next simulations to merge.

    Returns
    -------
    list[SimMetadata]
        The list of new simulations after merging.
    """
    # Get the simulations to merge
    to_merge = [next_sims[idx] for idx in indices]

    # Get the weights of each simulation to merge
    weights = [sim.weight for sim in to_merge]

    # Make sure the weights are normalized to sum to 1 for randomizing.
    # Since the entire ensemble should have a total weight of 1
    # any subset of the ensemble will have a total weight less than 1.
    norm_weights = np.array(weights) / sum(weights)

    # Randomly select one of the simulations with probability equal
    # to the normalized weights
    select: int = np.random.choice(len(to_merge), p=norm_weights)

    # Compute the union of all the wtg_parent_ids
    all_wtg_parent_ids = [set(sim.wtg_parent_ids) for sim in to_merge]
    wtg_parent_ids = list(set.union(*all_wtg_parent_ids))

    # Add the new simulation to the current iteration
    new_sim = self._add_new_simulation(
        to_merge[select],
        sum(weights),
        wtg_parent_ids,
    )

    # Create a list to store the new simulations
    new_sims: list[SimMetadata] = []

    # Get the indices of non-merged simulations
    no_merge_idxs = [i for i in range(len(next_sims)) if i not in indices]

    # Add back the simulations that will not be merged
    new_sims.extend(next_sims[i] for i in no_merge_idxs)

    # Add the new simulation to the list of new simulations
    new_sims.append(new_sim)

    # Get the parent simulation IDs of all the merged simulations
    merged_parents = {x.parent_simulation_id for x in to_merge}
    # Remove the parent simulation id of the new merged simulation
    merged_parents.remove(new_sim.parent_simulation_id)

    # Set the endpoint type for the merged simulations (except the new sim)
    for sim in cur_sims:
        # sim.simulation_id >= 0 ensures that the simulation has not
        # been recycled (i.e., it is not a negative index)
        if sim.simulation_id >= 0 and sim.simulation_id in merged_parents:
            # Set the endpoint type to 2 if the simulation is merged
            sim.endpoint_type = 2

    # Return the new simulation
    return new_sims

split_by_weight

split_by_weight(
    sims: list[SimMetadata], ideal_weight: float
) -> list[SimMetadata]

Split overweight sims.

Parameters:

Name Type Description Default
sims list[SimMetadata]

The list of simulations in a particular bin to split.

required
ideal_weight float

The ideal weight for each simulation, defined as the total (sum) weight of bin divided by the desired number of walkers in the bin. This is roughly equivalent to the average weight of the simulations in the bin.

required

Returns:

Type Description
list[SimMetadata]

The list of new simulations after splitting.

Source code in deepdrivewe/resamplers/base.py
def split_by_weight(
    self,
    sims: list[SimMetadata],
    ideal_weight: float,
) -> list[SimMetadata]:
    """Split overweight sims.

    Parameters
    ----------
    sims : list[SimMetadata]
        The list of simulations in a particular bin to split.
    ideal_weight : float
        The ideal weight for each simulation, defined as the total (sum)
        weight of bin divided by the desired number of walkers in the bin.
        This is roughly equivalent to the average weight of the simulations
        in the bin.

    Returns
    -------
    list[SimMetadata]
        The list of new simulations after splitting.
    """
    # Get the weights of the simulations
    weights = np.array([sim.weight for sim in sims])

    # Get the simulation indices
    indices = np.arange(len(sims))

    # Find the walkers that need to be split
    split_inds = indices[weights > ideal_weight].tolist()

    # Calculate the number of splits for each walker
    num_splits = [math.ceil(weights[i] / ideal_weight) for i in split_inds]

    # Split the simulations
    return self.split_sims(sims, split_inds, num_splits)

merge_by_weight

merge_by_weight(
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    ideal_weight: float,
) -> list[SimMetadata]

Merge underweight sims.

Parameters:

Name Type Description Default
cur_sims list[SimMetadata]

The list of current simulations.

required
next_sims list[SimMetadata]

The list of simulations in a particular bin to merge.

required
ideal_weight float

The ideal weight for each simulation, defined as the total (sum) weight of bin divided by the desired number of walkers in the bin. This is roughly equivalent to the average weight of the simulations in the bin.

required

Returns:

Type Description
list[SimMetadata]

The list of simulations after merging.

Source code in deepdrivewe/resamplers/base.py
def merge_by_weight(
    self,
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    ideal_weight: float,
) -> list[SimMetadata]:
    """Merge underweight sims.

    Parameters
    ----------
    cur_sims : list[SimMetadata]
        The list of current simulations.
    next_sims : list[SimMetadata]
        The list of simulations in a particular bin to merge.
    ideal_weight : float
        The ideal weight for each simulation, defined as the total (sum)
        weight of bin divided by the desired number of walkers in the bin.
        This is roughly equivalent to the average weight of the simulations
        in the bin.

    Returns
    -------
    list[SimMetadata]
        The list of simulations after merging.
    """
    while True:
        # Sort the simulations by weight
        sorted_sims = sorted(next_sims, key=lambda sim: sim.weight)

        # Get the weights of the sorted simulations
        weights = np.array([sim.weight for sim in sorted_sims])

        # Accumulate the weights
        cumul_weight = np.add.accumulate(weights)

        # Get the simulation indices
        indices = np.arange(len(next_sims))

        # Find the walkers that need to be merged
        to_merge = indices[cumul_weight <= ideal_weight].tolist()

        # Break the loop if no walkers need to be merged
        if len(to_merge) < 2:  # noqa: PLR2004
            return next_sims

        # Merge the simulations
        next_sims = self.merge_sims(cur_sims, sorted_sims, to_merge)

adjust_count

adjust_count(
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    target_count: int,
) -> list[SimMetadata]

Adjust the number of sims to match the target count.

Parameters:

Name Type Description Default
cur_sims list[SimMetadata]

The list of current simulations.

required
next_sims list[SimMetadata]

The list of simulations in a particular bin to adjust.

required
target_count int

The number of simulations to have in the bin.

required

Returns:

Type Description
list[SimMetadata]

The list of simulations after adjusting.

Source code in deepdrivewe/resamplers/base.py
def adjust_count(
    self,
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    target_count: int,
) -> list[SimMetadata]:
    """Adjust the number of sims to match the target count.

    Parameters
    ----------
    cur_sims : list[SimMetadata]
        The list of current simulations.
    next_sims : list[SimMetadata]
        The list of simulations in a particular bin to adjust.
    target_count : int
        The number of simulations to have in the bin.

    Returns
    -------
    list[SimMetadata]
        The list of simulations after adjusting.
    """
    # Case 1: Too few sims
    while len(next_sims) < target_count:
        # Get the index of the largest weight simulation
        index = int(np.argmax([sim.weight for sim in next_sims]))

        # Split the highest weight sim in two
        next_sims = self.split_sims(next_sims, [index], 2)

        # Break the loop if the target count is reached
        if len(next_sims) == target_count:
            break

    # Case 2: Too many sims
    while len(next_sims) > target_count:
        # Sort the simulation indices by weight
        sorted_indices = np.argsort([sim.weight for sim in next_sims])

        # Get the two lowest weight indices to merge
        indices = sorted_indices[:2].tolist()

        # Merge the two lowest weight sims
        next_sims = self.merge_sims(cur_sims, next_sims, indices)

        # Break the loop if the target count is reached
        if len(next_sims) == target_count:
            break

    return next_sims

split_by_threshold

split_by_threshold(
    sims: list[SimMetadata], max_allowed_weight: float
) -> list[SimMetadata]

Split the sims by threshold.

Parameters:

Name Type Description Default
sims list[SimMetadata]

The list of simulations to split.

required
max_allowed_weight float

The maximum allowed weight for each simulation. If the weight of a simulation exceeds this value, it will be split.

required

Returns:

Type Description
list[SimMetadata]

The list of simulations after splitting.

Source code in deepdrivewe/resamplers/base.py
def split_by_threshold(
    self,
    sims: list[SimMetadata],
    max_allowed_weight: float,
) -> list[SimMetadata]:
    """Split the sims by threshold.

    Parameters
    ----------
    sims : list[SimMetadata]
        The list of simulations to split.
    max_allowed_weight : float
        The maximum allowed weight for each simulation. If the weight of a
        simulation exceeds this value, it will be split.

    Returns
    -------
    list[SimMetadata]
        The list of simulations after splitting.
    """
    return self.split_by_weight(sims, max_allowed_weight)

merge_by_threshold

merge_by_threshold(
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    min_allowed_weight: float,
) -> list[SimMetadata]

Merge all simulations under a given threshold into a single sim.

Parameters:

Name Type Description Default
cur_sims list[SimMetadata]

The list of current simulations.

required
next_sims list[SimMetadata]

The list of simulations in a particular bin to merge.

required
min_allowed_weight float

The minimum allowed weight for each simulation. All the simulations with a weight less than this value will be merged into a single simulation walker.

required

Returns:

Type Description
list[SimMetadata]

The list of simulations after merging.

Source code in deepdrivewe/resamplers/base.py
def merge_by_threshold(
    self,
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
    min_allowed_weight: float,
) -> list[SimMetadata]:
    """Merge all simulations under a given threshold into a single sim.

    Parameters
    ----------
    cur_sims : list[SimMetadata]
        The list of current simulations.
    next_sims : list[SimMetadata]
        The list of simulations in a particular bin to merge.
    min_allowed_weight : float
        The minimum allowed weight for each simulation. All the simulations
        with a weight less than this value will be merged into a single
        simulation walker.

    Returns
    -------
    list[SimMetadata]
        The list of simulations after merging.
    """
    while True:
        # Sort the simulations by weight
        sorted_sims = sorted(next_sims, key=lambda sim: sim.weight)

        # Get the weights of the sorted simulations
        weights = np.array([sim.weight for sim in sorted_sims])

        # Get the simulation indices
        indices = np.arange(len(next_sims))

        # Find the walkers that need to be merged
        to_merge = indices[weights < min_allowed_weight].tolist()
        if len(to_merge) < 2:  # noqa: PLR2004
            return next_sims

        # Merge the simulations
        next_sims = self.merge_sims(cur_sims, sorted_sims, to_merge)

get_pcoords

get_pcoords(
    next_sims: list[SimMetadata], pcoord_idx: int = 0
) -> list[float]

Extract the progress coordinates from the simulations.

Parameters:

Name Type Description Default
next_sims list[SimMetadata]

The list of simulation metadata.

required
pcoord_idx int

The index of the progress coordinate to extract. Default is 0.

0

Returns:

Type Description
list[float]

The progress coordinates for the simulations.

Source code in deepdrivewe/resamplers/base.py
def get_pcoords(
    self,
    next_sims: list[SimMetadata],
    pcoord_idx: int = 0,
) -> list[float]:
    """Extract the progress coordinates from the simulations.

    Parameters
    ----------
    next_sims : list[SimMetadata]
        The list of simulation metadata.
    pcoord_idx : int
        The index of the progress coordinate to extract. Default is 0.

    Returns
    -------
    list[float]
        The progress coordinates for the simulations.
    """
    return [sim.parent_pcoord[pcoord_idx] for sim in next_sims]

resample abstractmethod

resample(
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
) -> tuple[list[SimMetadata], list[SimMetadata]]

Resample the weighted ensemble.

Source code in deepdrivewe/resamplers/base.py
@abstractmethod
def resample(
    self,
    cur_sims: list[SimMetadata],
    next_sims: list[SimMetadata],
) -> tuple[list[SimMetadata], list[SimMetadata]]:
    """Resample the weighted ensemble."""
    ...