U
    h}                     @  s   d dl mZ d dlZd dlmZmZ d dlmZmZmZm	Z	m
Z
mZmZ d dlmZ d dlmZmZ erd dlmZ d dlmZ d	Zd
ZdZdZdZG dd deZdS )    )annotationsN)InvalidStateErrorTask)TYPE_CHECKINGAsyncIteratorIteratorListOptionalSequenceTuple)	ByteStore)	SetupModeaexecute_cql)Session)PreparedStatementzm
    CREATE TABLE IF NOT EXISTS {keyspace}.{table} 
    (row_id TEXT, body_blob BLOB, PRIMARY KEY (row_id));
zDSELECT row_id, body_blob FROM  {keyspace}.{table} WHERE row_id IN ?;z2SELECT row_id, body_blob FROM  {keyspace}.{table};zAINSERT INTO {keyspace}.{table} (row_id, body_blob) VALUES (?, ?);z1DELETE FROM {keyspace}.{table} WHERE row_id IN ?;c                   @  s  e Zd ZdZddejddddddd	d
dZddddZddddZddddZ	ddddZ
ddddZdddddZdddddZddd d!d"Zddd d#d$Zdddd%d&Zdddd'd(Zdd)dd*d+d,d-Zdd)dd.d+d/d0ZdS )1CassandraByteStorea  A ByteStore implementation using Cassandra as the backend.

    Parameters:
        table: The name of the table to use.
        session: A Cassandra session object. If not provided, it will be resolved
            from the cassio config.
        keyspace: The keyspace to use. If not provided, it will be resolved
            from the cassio config.
        setup_mode: The setup mode to use. Default is SYNC  (SetupMode.SYNC).
    N)sessionkeyspace
setup_modestrzOptional[Session]zOptional[str]r   None)tabler   r   r   returnc             	   C  s   |r|sZz.ddl m}m} |p$||| _|p0| | _W qf ttfk
rV   tdY qfX n|| _|| _|| _d | _d | _	d | _
tj| j| jd}d | _|tjkrtt| j|| _n| j| d S )Nr   )check_resolve_keyspacecheck_resolve_sessionz_Could not import a recent cassio package.Please install it with `pip install --upgrade cassio`.r   r   )Zcassio.configr   r   r   r   ImportErrorModuleNotFoundErrorr   select_statementinsert_statementdelete_statementCREATE_TABLE_CQL_TEMPLATEformatdb_setup_taskr   ASYNCasyncioZcreate_taskr   execute)selfr   r   r   r   r   r   Z
create_cql r(   I/tmp/pip-unpacked-wheel-9gdii04g/langchain_community/storage/cassandra.py__init__1   s2    

zCassandraByteStore.__init__)r   c                 C  s6   | j r2z| j   W n tk
r0   tdY nX dS )zAEnsure that the DB setup is finished. If not, raise a ValueError.zAsynchronous setup of the DB not finished. NB: AstraDB components sync methods shouldn't be called from the event loop. Consider using their async equivalents.N)r#   resultr   
ValueErrorr'   r(   r(   r)   ensure_db_setupX   s    z"CassandraByteStore.ensure_db_setupc                   s   | j r| j I dH  dS )z:Ensure that the DB setup is finished. If not, wait for it.N)r#   r-   r(   r(   r)   aensure_db_setupd   s    z#CassandraByteStore.aensure_db_setupr   c                 C  s(   | j s"| jtj| j| jd| _ | j S )zGet the prepared select statement for the table.
        If not available, prepare it.

        Returns:
            PreparedStatement: The prepared statement.
        r   )r   r   prepareSELECT_TABLE_CQL_TEMPLATEr"   r   r   r-   r(   r(   r)   get_select_statementi   s     z'CassandraByteStore.get_select_statementc                 C  s(   | j s"| jtj| j| jd| _ | j S )zGet the prepared insert statement for the table.
        If not available, prepare it.

        Returns:
            PreparedStatement: The prepared statement.
        r   )r   r   r0   INSERT_TABLE_CQL_TEMPLATEr"   r   r   r-   r(   r(   r)   get_insert_statementx   s     z'CassandraByteStore.get_insert_statementc                 C  s(   | j s"| jtj| j| jd| _ | j S )zGet the prepared delete statement for the table.
        If not available, prepare it.

        Returns:
            PreparedStatement: The prepared statement.
        r   )r    r   r0   DELETE_TABLE_CQL_TEMPLATEr"   r   r   r-   r(   r(   r)   get_delete_statement   s     z'CassandraByteStore.get_delete_statementzSequence[str]zList[Optional[bytes]])keysr   c                   sT   ddl m} |   i  | j|  ||gD ]}|j |j< q0 fdd|D S )Nr   ValueSequencec                   s   g | ]}  |qS r(   get.0keyZ	docs_dictr(   r)   
<listcomp>   s     z+CassandraByteStore.mget.<locals>.<listcomp>)cassandra.queryr9   r.   r   r&   r2   	body_blobrow_idr'   r7   r9   rowr(   r?   r)   mget   s     zCassandraByteStore.mgetc                   sb   ddl m} |  I d H  i  t| j|  ||gdI d H D ]}|j |j< q> fdd|D S )Nr   r8   
parametersc                   s   g | ]}  |qS r(   r:   r<   r?   r(   r)   r@      s     z,CassandraByteStore.amget.<locals>.<listcomp>)rA   r9   r/   r   r   r2   rB   rC   rD   r(   r?   r)   amget   s      zCassandraByteStore.amgetzSequence[Tuple[str, bytes]])key_value_pairsr   c                 C  s4   |    |  }|D ]\}}| j|||f qd S )N)r.   r4   r   r&   r'   rJ   r   kvr(   r(   r)   mset   s    zCassandraByteStore.msetc                   sB   |   I d H  |  }|D ]"\}}t| j|||fdI d H  qd S )NrG   )r/   r4   r   r   rK   r(   r(   r)   amset   s    zCassandraByteStore.amsetc                 C  s0   ddl m} |   | j|  ||g d S )Nr   r8   )rA   r9   r.   r   r&   r6   r'   r7   r9   r(   r(   r)   mdelete   s    zCassandraByteStore.mdeletec                   s>   ddl m} |  I d H  t| j|  ||gdI d H  d S )Nr   r8   rG   )rA   r9   r/   r   r   r6   rP   r(   r(   r)   amdelete   s      zCassandraByteStore.amdelete)prefixzIterator[str])rS   r   c                c  sF   |    | jtj| j| jdD ]}|j}|r:||r"|V  q"d S Nr   )	r.   r   r&   SELECT_ALL_TABLE_CQL_TEMPLATEr"   r   r   rC   
startswithr'   rS   rE   r>   r(   r(   r)   
yield_keys   s     zCassandraByteStore.yield_keyszAsyncIterator[str]c                C sR   |   I d H  t| jtj| j| jdI d H D ]}|j}|rF||r.|V  q.d S rT   )	r/   r   r   rU   r"   r   r   rC   rV   rW   r(   r(   r)   ayield_keys   s     zCassandraByteStore.ayield_keys)__name__
__module____qualname____doc__r   ZSYNCr*   r.   r/   r2   r4   r6   rF   rI   rN   rO   rQ   rR   rX   rY   r(   r(   r(   r)   r   %   s$   'r   )
__future__r   r%   r   r   typingr   r   r   r   r	   r
   r   Zlangchain_core.storesr   Z'langchain_community.utilities.cassandrar   r   Zcassandra.clusterr   rA   r   r!   r1   rU   r3   r5   r   r(   r(   r(   r)   <module>   s    $
