sparkly package

Submodules

sparkly.analysis module

class sparkly.analysis.Gram3Analyzer

Bases: org.apache.pylucene.analysis.PythonAnalyzer

Attributes:
class
reuseStrategy
self
version

Methods

ReuseStrategy alias of org.apache.lucene.analysis.Analyzer$ReuseStrategy
TokenStreamComponents alias of org.apache.lucene.analysis.Analyzer$TokenStreamComponents
cast_  
close  
createComponents  
equals  
finalize  
getClass  
getOffsetGap  
getPositionIncrementGap  
getReuseStrategy  
getVersion  
hashCode  
initReader  
instance_  
normalize  
notify  
notifyAll  
pythonExtension  
setVersion  
toString  
tokenStream  
wait  
createComponents(fieldName)
class sparkly.analysis.PythonAlnumTokenFilter(tokenStream)

Bases: org.apache.pylucene.analysis.PythonFilteringTokenFilter

Attributes:
attributeClassesIterator
attributeFactory
attributeImplsIterator
class
self

Methods

State alias of org.apache.lucene.util.AttributeSource$State
accept  
addAttribute  
addAttributeImpl  
captureState  
cast_  
clearAttributes  
cloneAttributes  
close  
copyTo  
end  
endAttributes  
equals  
finalize  
getAttribute  
getAttributeClassesIterator  
getAttributeFactory  
getAttributeImplsIterator  
getClass  
hasAttribute  
hasAttributes  
hashCode  
incrementToken  
instance_  
notify  
notifyAll  
pythonExtension  
reflectAsString  
reflectWith  
removeAllAttributes  
reset  
restoreState  
toString  
wait  
accept()
class sparkly.analysis.StandardEdgeGram36Analyzer

Bases: org.apache.pylucene.analysis.PythonAnalyzer

Attributes:
class
reuseStrategy
self
version

Methods

ReuseStrategy alias of org.apache.lucene.analysis.Analyzer$ReuseStrategy
TokenStreamComponents alias of org.apache.lucene.analysis.Analyzer$TokenStreamComponents
cast_  
close  
createComponents  
equals  
finalize  
getClass  
getOffsetGap  
getPositionIncrementGap  
getReuseStrategy  
getVersion  
hashCode  
initReader  
instance_  
normalize  
notify  
notifyAll  
pythonExtension  
setVersion  
toString  
tokenStream  
wait  
createComponents(fieldName)
class sparkly.analysis.UnfilteredGram5Analyzer

Bases: org.apache.pylucene.analysis.PythonAnalyzer

Attributes:
class
reuseStrategy
self
version

Methods

ReuseStrategy alias of org.apache.lucene.analysis.Analyzer$ReuseStrategy
TokenStreamComponents alias of org.apache.lucene.analysis.Analyzer$TokenStreamComponents
cast_  
close  
createComponents  
equals  
finalize  
getClass  
getOffsetGap  
getPositionIncrementGap  
getReuseStrategy  
getVersion  
hashCode  
initReader  
instance_  
normalize  
notify  
notifyAll  
pythonExtension  
setVersion  
toString  
tokenStream  
wait  
createComponents(fieldName)
sparkly.analysis.analyze(analyzer, text, with_offset=False)

Apply the analyzer to the text and return the tokens, optionally with offsets

Parameters:
analyzer :

The lucene analyzer to be applied

text : str

the text that will be analyzer

with_offset : bool

if true, return the offsets with the tokens in the form (TOKEN, START_OFFSET, END_OFFSET)

Returns:
list of str or tuples

a list of tokens potentially with offsets

sparkly.analysis.get_shingle_analyzer()
sparkly.analysis.get_standard_analyzer_no_stop_words()

sparkly.search module

class sparkly.search.Searcher(index, search_chunk_size=500)

Bases: object

class for performing bulk search over a dataframe

Methods

get_full_query_spec() get a query spec that searches on all indexed fields
search(search_df, query_spec, limit[, id_col]) perform search for all the records in search_df according to
get_full_query_spec()

get a query spec that searches on all indexed fields

search(search_df, query_spec, limit, id_col='_id')

perform search for all the records in search_df according to query_spec

Parameters:
search_df : pyspark.sql.DataFrame

the records used for searching

query_spec : QuerySpec

the query spec for searching

limit : int

the topk that will be retrieved for each query

id_col : str

the id column from search_df that will be output with the query results

Returns:
pyspark DataFrame

a pyspark dataframe with the schema (id_col, ids array<long> , scores array<float>, search_time float)

sparkly.search.search(index, query_spec, limit, search_recs)
sparkly.search.search_gen(index, query_spec, limit, search_recs)

sparkly.utils module

class sparkly.utils.Timer

Bases: object

utility class for timing execution of code

Methods

get_interval() get the time that has elapsed since the object was created or the
get_total() get total time this Timer has been alive
set_start_time() set the start time to the current time
get_interval()

get the time that has elapsed since the object was created or the last time get_interval() was called

Returns:
float
get_total()

get total time this Timer has been alive

Returns:
float
set_start_time()

set the start time to the current time

sparkly.utils.atomic_unzip(zip_file_name, output_loc)

atomically unzip the file, that is this function is safe to call from multiple threads at the same time

Parameters:
zip_file_name : str

the name of the file to be unzipped

output_loc : str

the location that the file will be unzipped to

sparkly.utils.auc(x)
sparkly.utils.get_index_name(n, *postfixes)

utility function for generating index names in a uniform way

sparkly.utils.get_logger(name, level=10)

Get the logger for a module

Returns:
Logger
sparkly.utils.init_jvm(vmargs=[])

initialize the jvm for PyLucene

Parameters:
vmargs : list[str]

the jvm args to the passed to the vm

sparkly.utils.invoke_task(task)

invoke a task created by joblib.delayed

sparkly.utils.is_null(o)

check if the object is null, note that this is here to get rid of the weird behavior of np.isnan and pd.isnull

sparkly.utils.is_persisted(df)

check if the pyspark dataframe is persist

sparkly.utils.kill_loky_workers()

kill all the child loky processes of this process. used to prevent joblib from sitting on resources after using joblib.Parallel to do computation

sparkly.utils.norm_auc(x)
sparkly.utils.persisted(df, storage_level=StorageLevel(True, True, False, False, 1))

context manager for presisting a dataframe in a with statement. This automatically unpersists the dataframe at the end of the context

sparkly.utils.repartition_df(df, part_size, by)

repartition the dataframe into chunk of size ‘part_size’ by column ‘by’

sparkly.utils.spark_to_pandas_stream(df, chunk_size, by='_id')

repartition df into chunk_size and return as iterator of pandas dataframes

sparkly.utils.type_check(var, var_name, expected)

type checking utility, throw a type error if the var isn’t the expected type

sparkly.utils.type_check_iterable(var, var_name, expected_var_type, expected_element_type)

type checking utility for iterables, throw a type error if the var isn’t the expected type or any of the elements are not the expected type

sparkly.utils.zip_dir(d, outfile=None)

Zip a directory d and output it to outfile. If outfile is not provided, the zipped file is output in /tmp

Parameters:
d : str or Path

the directory to be zipped

outfile : str or Path, optional

the output location of the zipped file

Returns:
Path

the path to the new zip file

Module contents