Tee#

class langchain_core.utils.aiter.Tee(iterable: AsyncIterator[T], n: int = 2, *, lock: AsyncContextManager[Any] | None = None)[source]#

Create n separate asynchronous iterators over iterable.

This splits a single iterable into multiple iterators, each providing the same items in the same order. All child iterators may advance separately but share the same items from iterable – when the most advanced iterator retrieves an item, it is buffered until the least advanced iterator has yielded it as well. A tee works lazily and can handle an infinite iterable, provided that all iterators advance.

async def derivative(sensor_data):
    previous, current = a.tee(sensor_data, n=2)
    await a.anext(previous)  # advance one iterator
    return a.map(operator.sub, previous, current)

Unlike itertools.tee(), tee() returns a custom type instead of a tuple. Like a tuple, it can be indexed, iterated and unpacked to get the child iterators. In addition, its aclose() method immediately closes all children, and it can be used in an async with context for the same effect.

If iterable is an iterator and read elsewhere, tee will not provide these items. Also, tee must internally buffer each item until the last iterator has yielded it; if the most and least advanced iterator differ by most data, using a list is more efficient (but not lazy).

If the underlying iterable is concurrency safe (anext may be awaited concurrently) the resulting iterators are concurrency safe as well. Otherwise, the iterators are safe if there is only ever one single “most advanced” iterator. To enforce sequential use of anext, provide a lock - e.g. an asyncio.Lock instance in an asyncio application - and access is automatically synchronised.

Methods

__init__(iterable[, n, lock])

aclose()

Async close all child iterators.

Parameters:
  • iterable (AsyncIterator[T])

  • n (int)

  • lock (AsyncContextManager[Any] | None)

__init__(iterable: AsyncIterator[T], n: int = 2, *, lock: AsyncContextManager[Any] | None = None)[source]#
Parameters:
  • iterable (AsyncIterator[T])

  • n (int)

  • lock (AsyncContextManager[Any] | None)

async aclose() None[source]#

Async close all child iterators.

Return type:

None