U
    h                     @  sZ  d Z ddlmZ ddlZddlmZ ddlZddlZddl	Z	ddl
Z
ddlmZmZmZmZmZmZmZmZmZmZmZmZmZmZ ddlZddlmZ ddlmZmZ ddlmZ ddlm Z! dd	l"m#Z$ dd
l%m&Z& ddl'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8 ddl9m:Z:m;Z;m<Z<m=Z= erRddl>Z?e?j@Z@neZ@eAeBZCeeDgeeD f ZEdBdddddddddddddddddZFdCddddddddd d!	d"d#ZGdDd$dddddddddddd%ddd&d'ZHG d(d) d)e,ZIG d*d dZJd+d,d-d.d/d0d1d2d3ZKd4d+d5d6d7ZLd$d/d8d9d:d;ZMed<ZNd=d>d?d@dAZOdS )EzV2 Evaluation Interface.    )annotationsN)TYPE_CHECKINGAnyAsyncIterableAsyncIterator	AwaitableCallableDictIterableListOptionalSequenceTypeVarUnioncast)run_helpers)	run_treesschemas)r   )utils)_aiter)
_warn_once)AEVALUATOR_TDATA_TEVALUATOR_TExperimentResultRow_ExperimentManagerMixin_extract_feedback_keys_ForwardResults_is_langchain_runnable_load_examples_map_load_experiment
_load_tqdm_load_traces_resolve_data_resolve_evaluators_resolve_experiment
_to_pandas_wrap_summary_evaluators)SUMMARY_EVALUATOR_TEvaluationResultEvaluationResultsRunEvaluator   TzHUnion[DATA_T, AsyncIterable[schemas.Example], Iterable[schemas.Example]]z4Optional[Sequence[Union[EVALUATOR_T, AEVALUATOR_T]]]z'Optional[Sequence[SUMMARY_EVALUATOR_T]]Optional[dict]Optional[str]Optional[int]intOptional[langsmith.Client]boolz6Optional[Union[schemas.TracerSession, str, uuid.UUID]]z%Union[ATARGET_T, AsyncIterable[dict]]AsyncExperimentResults)data
evaluatorssummary_evaluatorsmetadataexperiment_prefixdescriptionmax_concurrencynum_repetitionsclientblocking
experimentupload_resultstargetreturnc                  sP   |st d |r(|r(td| d| t| |||||||||	|
||dI dH S )a  Evaluate an async target system or function on a given dataset.

    Args:
        target (Union[AsyncCallable[[dict], dict], AsyncIterable[dict]]): The async target system or function to evaluate.
        data (Union[DATA_T, AsyncIterable[schemas.Example]]): The dataset to evaluate on. Can be a dataset name, a list of
            examples, an async generator of examples, or an async iterable of examples.
        evaluators (Optional[Sequence[EVALUATOR_T]]): A list of evaluators to run
            on each example. Defaults to None.
        summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): A list of summary
            evaluators to run on the entire dataset. Defaults to None.
        metadata (Optional[dict]): Metadata to attach to the experiment.
            Defaults to None.
        experiment_prefix (Optional[str]): A prefix to provide for your experiment name.
            Defaults to None.
        description (Optional[str]): A description of the experiment.
        max_concurrency (Optional[int]): The maximum number of concurrent
            evaluations to run. Defaults to None.
        num_repetitions (int): The number of times to run the evaluation.
            Each item in the dataset will be run and evaluated this many times.
            Defaults to 1.
        client (Optional[langsmith.Client]): The LangSmith client to use.
            Defaults to None.
        blocking (bool): Whether to block until the evaluation is complete.
            Defaults to True.
        experiment (Optional[schemas.TracerSession]): An existing experiment to
            extend. If provided, experiment_prefix is ignored. For advanced
            usage only.

    Returns:
        AsyncIterator[ExperimentResultRow]: An async iterator over the experiment results.

    Environment:
        - LANGSMITH_TEST_CACHE: If set, API calls will be cached to disk to save time and
            cost during testing. Recommended to commit the cache files to your repository
            for faster CI/CD runs.
            Requires the 'langsmith[vcr]' package to be installed.

    Examples:
        >>> from typing import Sequence
        >>> from langsmith import Client, aevaluate
        >>> from langsmith.schemas import Example, Run
        >>> client = Client()
        >>> dataset = client.clone_public_dataset(
        ...     "https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
        ... )
        >>> dataset_name = "Evaluate Examples"

        Basic usage:

        >>> def accuracy(run: Run, example: Example):
        ...     # Row-level evaluator for accuracy.
        ...     pred = run.outputs["output"]
        ...     expected = example.outputs["answer"]
        ...     return {"score": expected.lower() == pred.lower()}

        >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
        ...     # Experiment-level evaluator for precision.
        ...     # TP / (TP + FP)
        ...     predictions = [run.outputs["output"].lower() for run in runs]
        ...     expected = [example.outputs["answer"].lower() for example in examples]
        ...     # yes and no are the only possible answers
        ...     tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
        ...     fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
        ...     return {"score": tp / (tp + fp)}

        >>> import asyncio
        >>> async def apredict(inputs: dict) -> dict:
        ...     # This can be any async function or just an API call to your app.
        ...     await asyncio.sleep(0.1)
        ...     return {"output": "Yes"}
        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Experiment",
        ...         description="Evaluate the accuracy of the model asynchronously.",
        ...         metadata={
        ...             "my-prompt-version": "abcd-1234",
        ...         },
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        Evaluating over only a subset of the examples using an async generator:

        >>> async def example_generator():
        ...     examples = client.list_examples(dataset_name=dataset_name, limit=5)
        ...     for example in examples:
        ...         yield example
        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=example_generator(),
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Subset Experiment",
        ...         description="Evaluate a subset of examples asynchronously.",
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        Streaming each prediction to more easily + eagerly debug.

        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Streaming Experiment",
        ...         description="Streaming predictions for debugging.",
        ...         blocking=False,
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        >>> async def aenumerate(iterable):
        ...     async for elem in iterable:
        ...         print(elem)
        >>> asyncio.run(aenumerate(results))

        Running without concurrency:

        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Experiment Without Concurrency",
        ...         description="This was run without concurrency.",
        ...         max_concurrency=0,
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        Using Async evaluators:

        >>> async def helpfulness(run: Run, example: Example):
        ...     # Row-level evaluator for helpfulness.
        ...     await asyncio.sleep(5)  # Replace with your LLM API call
        ...     return {"score": run.outputs["output"] == "Yes"}

        >>> results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...         evaluators=[helpfulness],
        ...         summary_evaluators=[precision],
        ...         experiment_prefix="My Helpful Experiment",
        ...         description="Applying async evaluators example.",
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...
    z&'upload_results' parameter is in beta.zeExpected at most one of 'experiment' or 'experiment_prefix', but both were provided. Got: experiment=z, experiment_prefix=)r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   N)r   
ValueError
_aevaluate)r@   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?    rD   A/tmp/pip-unpacked-wheel-cqvhoa9t/langsmith/evaluation/_arunner.py	aevaluateI   s,     .rF   Fz,Union[str, uuid.UUID, schemas.TracerSession]"AsyncIterator[ExperimentResultRow])	r5   r6   r7   r:   r<   load_nestedr=   r>   rA   c                  s   |p
t  }t| tjr| ntt| |I dH }tjt| ||dI dH }	tt	||I dH   fdd|	D }
t
|	|
|||||||d	I dH S )aL  Evaluate existing experiment runs asynchronously.

    Args:
        experiment (Union[str, uuid.UUID]): The identifier of the experiment to evaluate.
        evaluators (Optional[Sequence[EVALUATOR_T]]): Optional sequence of evaluators to use for individual run evaluation.
        summary_evaluators (Optional[Sequence[SUMMARY_EVALUATOR_T]]): Optional sequence of evaluators
            to apply over the entire dataset.
        metadata (Optional[dict]): Optional metadata to include in the evaluation results.
        max_concurrency (Optional[int]): Optional maximum number of concurrent evaluations.
        client (Optional[langsmith.Client]): Optional Langsmith client to use for evaluation.
        load_nested: Whether to load all child runs for the experiment.
            Default is to only load the top-level root runs.
        blocking (bool): Whether to block until evaluation is complete.

    Returns:
        AsyncIterator[ExperimentResultRow]: An async iterator over the experiment results.

    Examples:
        Define your evaluators

        >>> from typing import Sequence
        >>> from langsmith.schemas import Example, Run
        >>> def accuracy(run: Run, example: Example):
        ...     # Row-level evaluator for accuracy.
        ...     pred = run.outputs["output"]
        ...     expected = example.outputs["answer"]
        ...     return {"score": expected.lower() == pred.lower()}
        >>> def precision(runs: Sequence[Run], examples: Sequence[Example]):
        ...     # Experiment-level evaluator for precision.
        ...     # TP / (TP + FP)
        ...     predictions = [run.outputs["output"].lower() for run in runs]
        ...     expected = [example.outputs["answer"].lower() for example in examples]
        ...     # yes and no are the only possible answers
        ...     tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
        ...     fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
        ...     return {"score": tp / (tp + fp)}

        Load the experiment and run the evaluation.

        >>> from langsmith import aevaluate, aevaluate_existing
        >>> dataset_name = "Evaluate Examples"
        >>> async def apredict(inputs: dict) -> dict:
        ...     # This can be any async function or just an API call to your app.
        ...     await asyncio.sleep(0.1)
        ...     return {"output": "Yes"}
        >>> # First run inference on the dataset
        ... results = asyncio.run(
        ...     aevaluate(
        ...         apredict,
        ...         data=dataset_name,
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...

        Then evaluate the results
        >>> experiment_name = "My Experiment:64e6e91"  # Or manually specify
        >>> results = asyncio.run(
        ...     aevaluate_existing(
        ...         experiment_name,
        ...         evaluators=[accuracy],
        ...         summary_evaluators=[precision],
        ...     )
        ... )  # doctest: +ELLIPSIS
        View the evaluation results for experiment:...


    N)rH   c                   s   g | ]} |j  qS rD   )reference_example_id).0runZdata_maprD   rE   
<listcomp>g  s     z&aevaluate_existing.<locals>.<listcomp>)r4   r5   r6   r7   r:   r<   r=   r>   )r   get_cached_client
isinstancer   ZTracerSession
aitertoolsaio_to_threadr    r"   r   rC   )r>   r5   r6   r7   r:   r<   rH   r=   projectrunsr4   rD   rL   rE   aevaluate_existing  s0    N
   rT   -Union[DATA_T, AsyncIterable[schemas.Example]]z<Union[ATARGET_T, AsyncIterable[dict], Iterable[schemas.Run]]c             
     s`  t | p(t| dr"t |  p(t| }|	p4t }	|r>d ntt	t
j | }tt|||	I d H \}}t||	||pv|||||d I d H }td }|d k	r| I d H }t|| d }nd }tj||	jgd| |r|jtt| |dI d H }|r|j||dI d H }|r*||I d H }t|}|
rF| I d H  |W  5 Q R  S Q R X d S )N	__aiter__)r<   r7   r>   r9   r;   rS   r?   z.yaml)Zignore_hostsr:   )asyncioiscoroutinefunctionhasattriscoroutinerV   r   rtrN   r   r
   r   RunrP   rQ   r%   _AsyncExperimentManagerastartls_utilsZget_cache_dirget_dataset_idpathlibPathZwith_optional_cacheZapi_urlawith_predictions	ATARGET_Tawith_evaluatorsawith_summary_evaluatorsr3   wait)r@   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   Zis_async_targetrS   Zexperiment_manager	cache_dirZdsid
cache_pathresultsrD   rD   rE   rC   u  sZ    


  rC   c                      sN  e Zd ZdZdIddddd	d	d
dddd
 fddZddddZddddZddddZddddZd dddZ	dJd d!d d"d#d$Z
dd%d&d d d'd(d)Zd*d d+d,d-Zd.dd/d0Zd1dd2d3ZdKd d!d4d"d5d6ZdLd7d d.d'd8d9Zd7d:d;d:d<d=d>Zd*dd+d?d@Zd
ddAdBZdCddDdEZdFddGdHZ  ZS )Mr^   aa  Manage the execution of experiments asynchronously.

    Supports lazily running predictions and evaluations in parallel to facilitate
    result streaming and early debugging.

    Args:
        data (DATA_T): The data used for the experiment. Can be a dataset name or ID OR
            a generator of examples.
        runs (Optional[Iterable[schemas.Run]]): The runs associated with the experiment
            predictions.
        experiment (Optional[schemas.TracerSession]): The tracer session
            associated with the experiment.
        experiment_prefix (Optional[str]): The prefix for the experiment name.
        description (Optional[str]): The description for the experiment.
        metadata (Optional[dict]): Additional metadata for the experiment.
        client (Optional[langsmith.Client]): The Langsmith client used for
             the experiment.
        evaluation_results (Optional[Iterable[EvaluationResults]]): The evaluation
            sresults for the experiment.
        summary_results (Optional[Iterable[EvaluationResults]]): The aggregate results
            for the experiment.
    Nr,   Tz+Optional[Union[schemas.TracerSession, str]]r-   zBOptional[Union[Iterable[schemas.Run], AsyncIterable[schemas.Run]]]r1   z*Optional[AsyncIterable[EvaluationResults]]r.   r0   r2   rU   )
r>   r7   rS   r<   evaluation_resultssummary_resultsr9   r;   r?   r4   c                  sT   t  j||||d || _d | _|d k	r2t|nd | _|| _|| _|	| _	|
| _
d S )N)r>   r7   r<   r9   )super__init___data	_examplesrP   ensure_async_iterator_runs_evaluation_results_summary_results_num_repetitions_upload_results)selfr4   r>   r7   rS   r<   rm   rn   r9   r;   r?   	__class__rD   rE   rp     s    z _AsyncExperimentManager.__init__AsyncIterator[schemas.Example]rA   c                   sb   | j d kr<t| j| jd| _ | jdkr<tt| j | j| _ tjt| j dt	
 d\| _ }|S )Nr<   r,      lock)rr   _aresolve_datarq   r<   rw   async_chain_from_iterablerP   ateers   rX   Lock)ry   Zexamples_iterrD   rD   rE   aget_examples  s    


  z%_AsyncExperimentManager.aget_examplesstrc                   sX   | j d kst| j dd sLt|  I d H I d H }|d krBtdt|jS t| j jS )Nreference_dataset_idz!No examples found in the dataset.)	_experimentgetattrrP   py_anextr   rB   r   Z
dataset_idr   )ry   examplerD   rD   rE   ra     s      
z&_AsyncExperimentManager.get_dataset_idzAsyncIterator[schemas.Run]c                 C sR   | j d krtdtjt| j dt d\| _ }|2 z3 d H W }|V  q86 d S )NzRuns not loaded yet.r   r   )rt   rB   rP   r   rs   rX   r   )ry   rS   rK   rD   rD   rE   	aget_runs   s    

  z!_AsyncExperimentManager.aget_runsz AsyncIterator[EvaluationResults]c                 C st   | j d kr4|  I d H 2 z3 d H W }dg iV  q6 n<tjt| j dt d\| _ }|2 z3 d H W }|V  qZ6 d S )Nrl   r   r   )ru   r   rP   r   rs   rX   r   )ry   _rm   resultrD   rD   rE   aget_evaluation_results	  s    

z/_AsyncExperimentManager.aget_evaluation_resultsc              	     s   zt |  I d H I d H }W n tk
r:   tdY nX |sHtd| jrX| |nd }| || | j| j	d< | j
|  I d H || j	| j| j| j| jdS )Nz\No examples found in the dataset. Please ensure the data provided to aevaluate is not empty.z[No examples found in the dataset.Please ensure the data provided to aevaluate is not empty.r;   )r>   r7   r<   rS   rm   r?   )rP   r   r   StopAsyncIterationrB   rx   Z_get_projectZ_print_experiment_startrw   	_metadatar{   r<   rt   ru   )ry   Zfirst_examplerR   rD   rD   rE   r_     s,    
z_AsyncExperimentManager.astartr/   re   )r:   r@   rA   c                  sV   | j ||d}tj|dt d\}}tdd |2 | j| j| jdd |2 | j	dS )NrW   r   r   c                 S s    | z3 d H W }|d V  q6 d S Nr   rD   rJ   predrD   rD   rE   	<genexpr>9  s     z<_AsyncExperimentManager.awith_predictions.<locals>.<genexpr>c                 S s    | z3 d H W }|d V  q6 d S NrK   rD   r   rD   rD   rE   r   =  s     )r>   r7   r<   rS   r?   )
	_apredictrP   r   rX   r   r^   r   r   r<   rx   )ry   r@   r:   Z_experiment_resultsr1r2rD   rD   rE   rd   0  s    z)_AsyncExperimentManager.awith_predictionsrW   z*Sequence[Union[EVALUATOR_T, AEVALUATOR_T]])r5   r:   rA   c             
     sp   t |}| j||d}tj|dt d\}}}tdd |2 | j| j| j	dd |2 dd |2 | j
| jdS )	NrW      r   c                 S s    | z3 d H W }|d V  q6 d S r   rD   rJ   r   rD   rD   rE   r   K  s     z;_AsyncExperimentManager.awith_evaluators.<locals>.<genexpr>c                 S s    | z3 d H W }|d V  q6 d S r   rD   r   rD   rD   rE   r   O  s     c                 S s    | z3 d H W }|d V  q6 d S )Nrm   rD   r   rD   rD   rE   r   P  s     r>   r7   r<   rS   rm   rn   r?   )r$   _ascorerP   r   rX   r   r^   r   r   r<   rv   rx   )ry   r5   r:   Zexperiment_resultsr   r   Zr3rD   rD   rE   rf   A  s    z(_AsyncExperimentManager.awith_evaluatorszSequence[SUMMARY_EVALUATOR_T])r6   rA   c              
     sB   t |}| |}t|  I d H | j| j| j|  | j|| j	dS )Nr   )
r'   _aapply_summary_evaluatorsr^   r   r   r   r<   r   ru   rx   )ry   r6   Zwrapped_evaluatorsZaggregate_feedback_genrD   rD   rE   rg   U  s    
z0_AsyncExperimentManager.awith_summary_evaluatorsrG   c                 C sJ   t |  |  I d H |  2 z"3 d H W \}}}t|||dV  q 6 d S )NrK   r   rm   )rP   	async_zipr   r   r   r   )ry   rK   r   rm   rD   rD   rE   aget_resultsf  s      z$_AsyncExperimentManager.aget_resultszDict[str, List[dict]]c                   s,   | j d krdg iS ddd | j 2 I d H iS )Nrl   c                   s(   g | z3 d H W }|d D ]}|qq6 S )Nrl   rD   )rJ   rl   resrD   rD   rE   rM   t  s   
 z?_AsyncExperimentManager.aget_summary_scores.<locals>.<listcomp>)rv   ry   rD   rD   rE   aget_summary_scoresp  s    
 
z+_AsyncExperimentManager.aget_summary_scoreszAsyncIterator[_ForwardResults]c                 sP   t |  fdd}tj|| dd2 z3 d H W }|V  q(6  I d H  d S )Nc                   s:     I d H 2 z$3 d H W } t | jjjV  q6 d S N)r   	_aforwardexperiment_namer   r<   )r   fnry   rD   rE   predict_all  s        z6_AsyncExperimentManager._apredict.<locals>.predict_allMbP?Z_eager_consumption_timeout)_ensure_async_traceablerP   aiter_with_concurrency_aend)ry   r@   r:   r   r   rD   r   rE   r   }  s      
z!_AsyncExperimentManager._apredictzSequence[RunEvaluator]c              	    sT   t jdd> fdd}tj|| dd2 z3 d H W }|V  q06 W 5 Q R X d S )N   )max_workersc                   s.     2 z3 d H W } j | dV  q6 d S )N)executor)r   _arun_evaluators)current_resultsr5   r   ry   rD   rE   	score_all  s      z2_AsyncExperimentManager._ascore.<locals>.score_allr   r   )cfZThreadPoolExecutorrP   r   )ry   r5   r:   r   r   rD   r   rE   r     s      z_AsyncExperimentManager._ascorer   zcf.ThreadPoolExecutor)r5   r   r   rA   c                   s  t  }|d pi d| ji}t jf |d|| js4dnd| jd |d |d }|d	 }|D ]P}zF|j|d
I d H }	|d | j|	 | jr| jj	|	|d W qd t
k
r   zzVt|}
t fdd|
D d}|d | j| | jr| jj	||d W n4 t
k
rN } ztd|  W 5 d }~X Y nX tjdt| dj dt  dd tjdt| dj dt  dd W 5 d   X Y qdX qdt||dW  5 Q R  S Q R X d S )Nr7   r>   r5   localTproject_namer7   enabledr<   rK   r   rm   rK   r   rl   )rK   Z	_executorc                   s&   g | ]}t |jt d didqS )errorT)keyZsource_run_idcommentextra)r)   idrepr)rJ   r   erK   rD   rE   rM     s   z<_AsyncExperimentManager._arun_evaluators.<locals>.<listcomp>)rl   zError parsing feedback keys: zError running evaluator z on run : exc_infor   )rhget_tracing_contextr   tracing_contextrx   r<   Zaevaluate_runextend_select_eval_resultsZ_log_evaluation_feedback	Exceptionr   r*   loggerdebugr   r   r   r   )ry   r5   r   r   current_contextr7   r   Zeval_results	evaluatorZevaluator_responseZfeedback_keysZerror_responsee2rD   r   rE   r     s~    
	

  

  
z(_AsyncExperimentManager._arun_evaluatorsc                 C s  g g  }}t |  I d H }t |  |2 z$3 d H W \}}|| || q.6 g }| jrj|  jnd }t	
 }	|	d pi | j|d}
t	jf |	d|
| jsdnd| jd |D ]}zz|||}| jj||jd}|| | jr6|D ]B}|jdhd	}|d
d }t j| jjf|d ||dI d H  qW q tk
rz } z"tjdt| d| dd W 5 d }~X Y qX qW 5 Q R X d|iV  d S )Nr7   )r>   Zexperiment_idr5   r   Tr   )fn_nameZtarget_run_id)excludeevaluator_info)Zrun_id
project_idZsource_infoz Error running summary evaluator r   r   rl   )rP   rs   r   r   r   appendrx   Z_get_experimentr   r   r   r   r   r<   r   __name__r   dictpoprQ   Zcreate_feedbackr   r   r   r   )ry   r6   rS   ZexamplesZasync_examplesrK   r   Zaggregate_feedbackr   r   r7   r   Zsummary_eval_resultZflattened_resultsr   Zfeedbackr   r   rD   rD   rE   r     sd    
 

	

$z2_AsyncExperimentManager._aapply_summary_evaluatorsc                   sT   g }|   I d H 2 z3 d H W }|jr||j q6 |r@t|nd }|rP| S d S r   )r   modified_atr   max	isoformat)ry   r   r   Zmax_modified_atrD   rD   rE   _get_dataset_version'  s    z,_AsyncExperimentManager._get_dataset_versionzOptional[list[str]]c                   s~   t  }|  I d H 2 z^3 d H W }|jrh|jdrht|jd trh|jd D ]}t|trL|| qLq|d q6 t|S )NZdataset_splitbase)setr   r7   getrO   listr   add)ry   splitsr   splitrD   rD   rE   _get_dataset_splits2  s    

z+_AsyncExperimentManager._get_dataset_splitsNonec                   s|   | j s
d S | j}|d kr td|  }|  I d H |d< |  I d H |d< | jj|j|j	pjt
j
t
jj|j|d d S )NzExperiment not started yet.Zdataset_versionZdataset_splits)end_timer7   )rx   r   rB   Z_get_experiment_metadatar   r   r<   Zupdate_projectr   r   datetimenowtimezoneutcr7   )ry   r>   Zproject_metadatarD   rD   rE   r   B  s     z_AsyncExperimentManager._aend)	NNNNNNNr,   T)N)N)N)r   
__module____qualname____doc__rp   r   ra   r   r   r_   rd   rf   rg   r   r   r   r   r   r   r   r   r   __classcell__rD   rD   rz   rE   r^     s@            &
	 
  M7r^   c                   @  s   e Zd ZddddZeddddZd	dd
dZddddZdddddZd#ddddddZ	ddddZ
ddddZdddd Zddd!d"ZdS )$r3   r^   )experiment_managerc                 C  s4   || _ g | _t | _t| | j | _d| _d S )Nr   )	_manager_resultsrX   r   _lockZcreate_task_process_data_task_processed_count)ry   r   rD   rD   rE   rp   X  s
    
zAsyncExperimentResults.__init__r   r}   c                 C  s   | j jS r   )r   r   r   rD   rD   rE   r   b  s    z&AsyncExperimentResults.experiment_namerG   c                 C  s   | S r   rD   r   rD   rD   rE   rV   f  s    z AsyncExperimentResults.__aiter__r   c                   s   ddd fdd} j 4 I d H T  jt jk rd j j }  jd7  _|W  5 Q I d H R  S  j rrtW 5 Q I d H R X ttj	|t jd dI d H  qd S )Nr0   r   )indexrA   c                   s     j | k rtdI d H  q d S )Ng?)r   rX   sleep)r   r   rD   rE   _wait_until_indexj  s    
z;AsyncExperimentResults.__anext__.<locals>._wait_until_indexr,   )timeout)
r   r   lenr   r   doner   rX   Zshieldwait_for)ry   r   r   rD   r   rE   	__anext__i  s    
z AsyncExperimentResults.__anext__r   )ri   rA   c                   s   t  }|| 2 z83 d H W }| j4 I d H  | j| W 5 Q I d H R X q6 | I d H }| j4 I d H  || _W 5 Q I d H R X d S r   )r!   r   r   r   r   r   rv   )ry   ri   ZtqdmitemZsummary_scoresrD   rD   rE   r   {  s     z$AsyncExperimentResults._process_datar   Nr/   	DataFrame)startendrA   c                 C  s   t | j||dS )N)r  r  )r&   r   )ry   r  r  rD   rD   rE   	to_pandas  s    z AsyncExperimentResults.to_pandasc                 C  s:   dd l }| jr.|jdr.| dd}| S |  S d S )Nr   pandas   )importlib.utilr   util	find_specr  _repr_html___repr__)ry   	importlibZdfrD   rD   rE   r    s
    z"AsyncExperimentResults._repr_html_r0   c                 C  s
   t | jS r   )r   r   r   rD   rD   rE   __len__  s    zAsyncExperimentResults.__len__c                 C  s   d| j  dS )Nz<AsyncExperimentResults >)r   r   rD   rD   rE   r    s    zAsyncExperimentResults.__repr__c                   s   | j I d H  d S r   )r   r   rD   rD   rE   rh     s    zAsyncExperimentResults.wait)r   N)r   r   r   rp   propertyr   rV   r   r   r  r  r  r  rh   rD   rD   rD   rE   r3   W  s   

   	z,rh.SupportsLangsmithExtra[[dict], Awaitable]zschemas.Exampler   r   zlangsmith.Clientr   )r   r   r   r7   r<   rA   c                   s   d  ddd fdd}t jdd zH| |jt j|j|||d|jrN|j n|j i|d	d
I d H  W n8 tk
r } zt	j
d| ddd W 5 d }~X Y nX tttj |dW  5 Q R  S Q R X d S )Nzrun_trees.RunTreer   )rrA   c                   s   |  d S r   rD   )r  rK   rD   rE   _get_run  s    z_aforward.<locals>._get_runT)r   Zexample_version)rI   Zon_endr   r7   r<   )Zlangsmith_extrazError running target function: r,   )r   
stacklevelr   )r   r   inputsZLangSmithExtrar   r   r   Z
created_atr   r   r   r   r   r   r]   )r   r   r   r7   r<   r  r   rD   r  rE   r     s:      
r   re   )r@   rA   c                 C  s\   t | s,t| s,t| r$tdntdt| r:| S t| rH| j} tjdd| S d S )NzTarget must be an async function. For sync functions, use evaluate. Example usage:

async def predict(inputs: dict) -> dict:
    # do work, like chain.invoke(inputs)
    return {...}
await aevaluate(predict, ...)zTarget must be a callable async function. Received a non-callable object. Example usage:

async def predict(inputs: dict) -> dict:
    # do work, like chain.invoke(inputs)
    return {...}
await aevaluate(predict, ...)ZAsyncTarget)name)	rX   rY   r   callablerB   r   Zis_traceable_functionZainvokeZ	traceable)r@   rD   rD   rE   r     s    	
r   r|   )r4   r<   rA   c                C  s&   t | trt| S tt| |dS )z*Return the examples for the given dataset.r~   )rO   r   rP   rs   r#   )r4   r<   rD   rD   rE   r     s    

r   TzIterable[AsyncIterable[T]]zAsyncIterator[T])iterablerA   c                 C s(   | D ]}|2 z3 dH W }|V  q6 qdS )zChain multiple async iterables.NrD   )r  Zsub_iterabler  rD   rD   rE   r     s    r   )NNNNNNr,   NTNT)NNNNNFT)NNNNNNr,   NTNT)Pr   
__future__r   rX   concurrent.futuresZfuturesr   r   loggingrb   uuidtypingr   r   r   r   r   r   r	   r
   r   r   r   r   r   r   Z	langsmithr   r   r   r   r\   r   r`   Zlangsmith._internalr   rP   Z#langsmith._internal._beta_decoratorr   Zlangsmith.evaluation._runnerr   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   Zlangsmith.evaluation.evaluatorr(   r)   r*   r+   r  pdr  	getLoggerr   r   r   re   rF   rT   rC   r^   r3   r   r   r   r  r   rD   rD   rD   rE   <module>   s   @L
           * J        j           *>   'E*	