U
    h`                     @  sR   d dl mZ d dlmZmZmZmZmZ er@d dlm	Z	m
Z
mZ G dd dZdS )    )annotations)TYPE_CHECKINGAnyIterableListOptional)	DataFrameRowSparkSessionc                   @  s   e Zd ZdZd1dddddddd	d
Zed2dddd dddZddddZddddZdddddZ	d3dddddZ
dddddZdd d!d"d#Zd$d%d&d'd(Zd4dddd*d+d,Zd5dddd-d.Zd6dddd*d/d0ZdS )7SparkSQLz;SparkSQL is a utility class for interacting with Spark SQL.N   zOptional[SparkSession]zOptional[str]zOptional[List[str]]int)spark_sessioncatalogschemaignore_tablesinclude_tablessample_rows_in_table_infoc           
      C  s,  zddl m} W n tk
r,   tdY nX |r6|n|j | _|dk	rX| jj| |dk	rn| jj| t	| 
 | _|rt	|nt	 | _| jr| j| j }|rtd| d|rt	|nt	 | _| jr| j| j }|rtd| d|  }	|	rt	|	n| j| _t|ts"td|| _dS )	a  Initialize a SparkSQL object.

        Args:
            spark_session: A SparkSession object.
              If not provided, one will be created.
            catalog: The catalog to use.
              If not provided, the default catalog will be used.
            schema: The schema to use.
              If not provided, the default schema will be used.
            ignore_tables: A list of tables to ignore.
              If not provided, all tables will be used.
            include_tables: A list of tables to include.
              If not provided, all tables will be used.
            sample_rows_in_table_info: The number of rows to include in the table info.
              Defaults to 3.
        r   r
   Fpyspark is not installed. Please install it with `pip install pyspark`Nzinclude_tables  not found in databasezignore_tables z,sample_rows_in_table_info must be an integer)pyspark.sqlr
   ImportErrorbuildergetOrCreate_sparkr   ZsetCurrentCatalogZsetCurrentDatabaseset_get_all_table_names_all_tables_include_tables
ValueError_ignore_tablesget_usable_table_namesZ_usable_tables
isinstancer   	TypeError_sample_rows_in_table_info)
selfr   r   r   r   r   r   r
   missing_tablesZusable_tables r(   K/tmp/pip-unpacked-wheel-9gdii04g/langchain_community/utilities/spark_sql.py__init__   s@    


zSparkSQL.__init__strzOptional[dict]r   )database_uriengine_argskwargsreturnc                 K  sJ   zddl m} W n tk
r,   tdY nX |j| }| |f|S )zzCreating a remote Spark Session via Spark connect.
        For example: SparkSQL.from_uri("sc://localhost:15002")
        r   r   r   )r   r
   r   r   remoter   )clsr,   r-   r.   r
   Zsparkr(   r(   r)   from_uriK   s    
zSparkSQL.from_urizIterable[str])r/   c                 C  s   | j r| j S t| j| j S )zGet names of tables available.)r   sortedr   r!   )r&   r(   r(   r)   r"   \   s    zSparkSQL.get_usable_table_namesc                 C  s(   | j dd }ttdd |S )NzSHOW TABLES	tableNamec                 S  s   | j S N)r4   )rowr(   r(   r)   <lambda>e       z/SparkSQL._get_all_table_names.<locals>.<lambda>)r   sqlselectcollectlistmap)r&   Zrowsr(   r(   r)   r   c   s    zSparkSQL._get_all_table_names)tabler/   c                 C  s6   | j d|  d j}|d}|d | d S )NzSHOW CREATE TABLE r   ZUSING;)r   r9   r;   Zcreatetab_stmtfind)r&   r>   Z	statementZusing_clause_indexr(   r(   r)   _get_create_table_stmtg   s    
zSparkSQL._get_create_table_stmt)table_namesr/   c                 C  s   |   }|d k	r6t||}|r2td| d|}g }|D ]D}| |}| jrx|d7 }|d| | d7 }|d7 }|| q>d|}|S )Nztable_names r   z

/*
z*/z

)	r"   r   
differencer    rA   r%   _get_sample_spark_rowsappendjoin)r&   rB   Zall_table_namesr'   ZtablesZ
table_nameZ
table_infoZ	final_strr(   r(   r)   get_table_infoo   s     

zSparkSQL.get_table_infoc                 C  s   d| d| j  }| j|}dttdd |jj}z"| |}ddd |D }W n t	k
rt   d	}Y nX | j  d
| d| d| S )NzSELECT * FROM z LIMIT 	c                 S  s   | j S r5   )name)fr(   r(   r)   r7      r8   z1SparkSQL._get_sample_spark_rows.<locals>.<lambda>rC   c                 S  s   g | ]}d  |qS )rI   )rG   ).0r6   r(   r(   r)   
<listcomp>   s     z3SparkSQL._get_sample_spark_rows.<locals>.<listcomp> z rows from z table:
)
r%   r   r9   rG   r<   r=   r   fields_get_dataframe_results	Exception)r&   r>   querydfZcolumns_strZsample_rowsZsample_rows_strr(   r(   r)   rE      s    

zSparkSQL._get_sample_spark_rowsr	   tuple)r6   r/   c                 C  s   t tt|  S r5   )rT   r=   r+   asDictvalues)r&   r6   r(   r(   r)   _convert_row_as_tuple   s    zSparkSQL._convert_row_as_tupler   r<   )rS   r/   c                 C  s   t t| j| S r5   )r<   r=   rW   r;   )r&   rS   r(   r(   r)   rP      s    zSparkSQL._get_dataframe_resultsall)commandfetchr/   c                 C  s,   | j |}|dkr|d}t| |S )NZone   )r   r9   limitr+   rP   )r&   rY   rZ   rS   r(   r(   r)   run   s    
zSparkSQL.runc              
   C  sB   z|  |W S  tk
r< } zd|  W Y S d}~X Y nX dS )af  Get information about specified tables.

        Follows best practices as specified in: Rajkumar et al, 2022
        (https://arxiv.org/abs/2204.00498)

        If `sample_rows_in_table_info`, the specified number of sample rows will be
        appended to each table description. This can increase performance as
        demonstrated in the paper.
        Error: N)rH   r    )r&   rB   er(   r(   r)   get_table_info_no_throw   s    
z SparkSQL.get_table_info_no_throwc              
   C  sD   z|  ||W S  tk
r> } zd|  W Y S d}~X Y nX dS )a*  Execute a SQL command and return a string representing the results.

        If the statement returns rows, a string of the results is returned.
        If the statement returns no rows, an empty string is returned.

        If the statement throws an error, the error message is returned.
        r^   N)r]   rQ   )r&   rY   rZ   r_   r(   r(   r)   run_no_throw   s    zSparkSQL.run_no_throw)NNNNNr   )N)N)rX   )N)rX   )__name__
__module____qualname____doc__r*   classmethodr2   r"   r   rA   rH   rE   rW   rP   r]   r`   ra   r(   r(   r(   r)   r   	   s*         ? r   N)
__future__r   typingr   r   r   r   r   r   r   r	   r
   r   r(   r(   r(   r)   <module>   s   