"""Python package for submitting/getting data from Lens Bulk"""
from typing import List
from pathlib import Path
from .common import common
from .common import get_uuid
from .typing import Species
from .typing import StudyType
from .typing import ChunkSize
from .connector import Connector
[docs]
class LensBulkConnector(Connector):
"""
Create a connector object to submit/get data from BioTuring Lens Bulk (Visium/GeoMx DSP)
Parameters
----------
host : str
The URL of the LENS BULK server, only support HTTPS connection\n
Example:
https://talk2data.bioturing.com/lens_bulk/
token : str
The API token to verify authority. Generated in-app.
"""
def _check_study_type(self, study_type):
if study_type not in [
StudyType.DSP.value,
StudyType.VISIUM.value,
StudyType.VISIUM_RDS.value,
StudyType.VISIUM_ANN.value,
]:
raise Exception('Unsupported study_type, please recheck the imported connector \n(should be LensBulkConnector for this study_type)')
def _submit_study(
self,
group_id: str,
study_id: str = None,
name: str = 'TBD',
authors: List[str] = [],
abstract: str = '',
species: str = Species.HUMAN.value,
study_type: int = StudyType.VISIUM.value,
):
if study_id is None:
study_id = get_uuid()
study_info = {
'study_hash_id': study_id,
'name': name,
'authors': authors if authors else [],
'abstract': abstract
}
return {
'species': species,
'group_id': group_id,
'filter_params': {
'min_counts': 0,
'min_genes': 0,
'max_counts': 1e9,
'max_genes': 1e9,
'mt_percentage': 1,
},
'study_type': study_type,
'normalize': True,
'subsample': -1,
'study_info': study_info,
}
[docs]
def submit_study_from_s3(
self,
group_id: str,
s3_id: str = None,
batch_info: List[dict] = [],
study_id: str = None,
name: str = 'TBD',
authors: List[str] = [],
abstract: str = '',
species: str = Species.HUMAN.value,
study_type: int = StudyType.DSP.value,
):
"""
Submit one or multiple data folders.
Parameters
----------
group_id : str
ID of the group to submit the data to.
s3_id : str, Optional
ID of s3 bucket. Default: None\n
If s3_id is not provided, we will use the first s3 bucket configured on the platform.
batch_info : List[dict]
File path and batch name information, the path DOES NOT include the bucket path!\n
Example:
For DSP format:
[{
'matrix': 's3_path/data_1/matrix.xlsx',\n
'image': 's3_path/data_1/image.ome.tiff',
}, {...}]
For Visium format:
[{
'matrix': 's3_path/data_1/matrix.h5',\n
'image': 's3_path/data_1/image.tiff'\n
'position': 's3_path/data_1/tissue_positions_list.csv'\n
'scale': 's3_path/data_1/scalefactors_json.json'\n
}, {...}]
For Visium RDS format:
[{
'matrix': 's3_path/GSE128223_1.rds'
}, {...}]
For Visium Anndata format:
[{
'matrix': 's3_path/GSE128223_1.h5ad'
}, {...}]
study_id : str, optional
Will be name of study (eg: VISIUM_PBMC). Default: uuidv4
name : str, optional
Name of the study. Default: 'TBD'
authors : List[str], optional
Authors of the study. Default: []
abstract : str, optional
Abstract of the study. Default: ''
species : bioturing_connector.typing.Species, optional
Species of the study. Default: 'human'.\n
Support:
bioturing_connector.typing.Species.HUMAN.value\n
bioturing_connector.typing.Species.MOUSE.value\n
bioturing_connector.typing.Species.NON_HUMAN_PRIMATE.value\n
bioturing_connector.typing.Species.OTHERS.value\n
study_type : bioturing_connector.typing.StudyType, optional
Format of the study. Default: bioturing_connector.typing.StudyType.DSP.value\n
Support:
bioturing_connector.typing.StudyType.DSP.value\n
bioturing_connector.typing.StudyType.VISIUM.value\n
bioturing_connector.typing.StudyType.VISIUM_RDS.value\n
bioturing_connector.typing.StudyType.VISIUM_ANN.value\n
Returns
----------
Submission status : bool | str
True or Error log
"""
self._check_study_type(study_type)
data = self._submit_study(
group_id,
study_id,
name,
authors,
abstract,
species,
study_type
)
if study_type == StudyType.VISIUM_ANN.value \
or study_type == StudyType.VISIUM_RDS.value:
for i, o in enumerate(batch_info):
o['name'] = o['matrix'].split('/')[-1]
else:
for i, o in enumerate(batch_info):
name = o['matrix'].split('/')
if len(name) == 1:
o['name'] = f'Batch {i + 1}'
else:
o['name'] = name[-2]
data['batch_info'] = {f'Batch_{i}': o for i, o in enumerate(batch_info)}
data['s3_id'] = s3_id
submission_status = self.post_request(
api_route='api/v1/submit_study_from_s3',
data=data
)
task_id = common.parse_submission_status(submission_status)
if task_id is None:
return False
return self.get_submission_log(
group_id=group_id,
task_id=task_id
)
[docs]
def submit_study_from_shared_s3(
self,
group_id: str,
shared_s3_id: str,
batch_info: List[dict] = [],
study_id: str = None,
name: str = 'TBD',
authors: List[str] = [],
abstract: str = '',
species: str = Species.HUMAN.value,
study_type: int = StudyType.DSP.value,
):
"""
Submit one or multiple data folders.
Parameters
----------
group_id : str
ID of the group to submit the data to.
shared_s3_id : str
ID of s3 bucket.
batch_info : List[dict]
File path and batch name information, the path DOES NOT include the bucket path!\n
Example:
For DSP format:
[{
'matrix': 's3_path/data_1/matrix.xlsx',\n
'image': 's3_path/data_1/image.ome.tiff',
}, {...}]
For Visium format:
[{
'matrix': 's3_path/data_1/matrix.h5',\n
'image': 's3_path/data_1/image.tiff'\n
'position': 's3_path/data_1/tissue_positions_list.csv'\n
'scale': 's3_path/data_1/scalefactors_json.json'\n
}, {...}]
For Visium RDS format:
[{
'matrix': 's3_path/GSE128223_1.rds'
}, {...}]
For Visium Anndata format:
[{
'matrix': 's3_path/GSE128223_1.h5ad'
}, {...}]
study_id : str, optional
Will be name of study (eg: VISIUM_PBMC). Default: uuidv4
name : str, optional
Name of the study. Default: 'TBD'
authors : List[str], optional
Authors of the study. Default: []
abstract : str, optional
Abstract of the study. Default: ''
species : bioturing_connector.typing.Species, optional
Species of the study. Default: 'human'.\n
Support:
bioturing_connector.typing.Species.HUMAN.value\n
bioturing_connector.typing.Species.MOUSE.value\n
bioturing_connector.typing.Species.NON_HUMAN_PRIMATE.value\n
bioturing_connector.typing.Species.OTHERS.value\n
study_type : bioturing_connector.typing.StudyType, optional
Format of the study. Default: bioturing_connector.typing.StudyType.DSP.value\n
Support:
bioturing_connector.typing.StudyType.DSP.value\n
bioturing_connector.typing.StudyType.VISIUM.value\n
bioturing_connector.typing.StudyType.VISIUM_RDS.value\n
bioturing_connector.typing.StudyType.VISIUM_ANN.value\n
Returns
----------
Submission status : bool | str
True or Error log
"""
self._check_study_type(study_type)
data = self._submit_study(
group_id,
study_id,
name,
authors,
abstract,
species,
study_type
)
if study_type == StudyType.VISIUM_ANN.value \
or study_type == StudyType.VISIUM_RDS.value:
for i, o in enumerate(batch_info):
o['name'] = o['matrix'].split('/')[-1]
else:
for i, o in enumerate(batch_info):
name = o['matrix'].split('/')
if len(name) == 1:
o['name'] = f'Batch {i + 1}'
else:
o['name'] = name[-2]
data['batch_info'] = {f'Batch_{i}': o for i, o in enumerate(batch_info)}
data['shared_s3_id'] = shared_s3_id
submission_status = self.post_request(
api_route='api/v1/submit_study_from_shared_s3',
data=data
)
task_id = common.parse_submission_status(submission_status)
if task_id is None:
return False
return self.get_submission_log(
group_id=group_id,
task_id=task_id
)
[docs]
def submit_study_from_local(
self,
group_id: str,
batch_info: object,
study_id: str = None,
name: str = 'TBD',
authors: List[str] = [],
abstract: str = '',
species: str = Species.HUMAN.value,
study_type: int = StudyType.DSP.value,
chunk_size: int = ChunkSize.CHUNK_100_MB.value,
):
"""
Submit one or multiple data folders.
Parameters
----------
group_id : str
ID of the group to submit the data to.
batch_info : List[dict]
File path and batch name information\n
Example:
For DSP format:
[{
'name': 'data_1',\n
'matrix': 'local_path/data_1/matrix.xlsx',\n
'image': 'local_path/data_1/image.ome.tiff',\n
}, {...}]
For Visium format:
[{
'name': 'data_1',\n
'matrix': 'local_path/data_1/matrix.h5',\n
'image': 'local_path/data_1/image.tiff'\n
'position': 'local_path/data_1/tissue_positions_list.csv'\n
'scale': 'local_path/data_1/scalefactors_json.json'\n
}, {...}]
For Visium RDS format:
[{
'matrix': 'local_path/GSE128223_1.rds'
}, {...}]
For Visium Anndata format:
[{
'matrix': 'local_path/GSE128223_1.h5ad'
}, {...}]
study_id : str, optional
Will be the displaying name of study (eg: VISIUM_PBMC). Default: uuidv4
name : str, optional
Name of the study. Default: 'TBD'
authors : List[str], optional
Authors of the study. Default: []
abstract : str, optional
Abstract of the study. Default: ''
species : bioturing_connector.typing.Species, optional
Species of the study. Default: 'human'.\n
Support:
bioturing_connector.typing.Species.HUMAN.value\n
bioturing_connector.typing.Species.MOUSE.value\n
bioturing_connector.typing.Species.NON_HUMAN_PRIMATE.value\n
bioturing_connector.typing.Species.OTHERS.value\n
study_type : bioturing_connector.typing.StudyType, optional
Format of the study. Default: bioturing_connector.typing.StudyType.DSP.value\n
Support:
bioturing_connector.typing.StudyType.DSP.value\n
bioturing_connector.typing.StudyType.VISIUM.value\n
bioturing_connector.typing.StudyType.VISIUM_RDS.value\n
bioturing_connector.typing.StudyType.VISIUM_ANN.value\n
chunk_size : bioturing_connector.typing.ChunkSize, optional
Size of each separated chunk for uploading. Default: 104857600\n
Support:
bioturing_connector.typing.ChunkSize.CHUNK_5_MB.value\n
bioturing_connector.typing.ChunkSize.CHUNK_100_MB.value\n
bioturing_connector.typing.ChunkSize.CHUNK_500_MB.value\n
bioturing_connector.typing.ChunkSize.CHUNK_1_GB.value\n
Returns
----------
Submission status : bool | str
True or Error log
"""
self._check_study_type(study_type)
if chunk_size not in [e.value for e in ChunkSize]:
return 'only support:\n{},\n{},\n{},\n{}'.format(
'ChunkSize.CHUNK_5_MB.value',
'ChunkSize.CHUNK_100_MB.value',
'ChunkSize.CHUNK_500_MB.value',
'ChunkSize.CHUNK_1_GB.value',
)
file_names = []
files = []
if study_type == StudyType.VISIUM_ANN.value \
or study_type == StudyType.VISIUM_RDS.value:
for o in batch_info:
p = Path(o['matrix'])
o['name'] = p.name
file_names.append(p.name)
files.append(p)
elif study_type == StudyType.VISIUM.value:
for o in batch_info:
if 'hires' in o['image'].lower():
tmp_name = 'hires'
elif 'lowres' in o['image'].lower():
tmp_name = 'lowres'
else:
tmp_name = 'image'
file_names.extend([
f'{o["name"]}matrix.h5',
f'{o["name"]}{tmp_name}.{o["image"].split(".")[-1]}',
f'{o["name"]}position.{o["position"].split(".")[-1]}',
f'{o["name"]}scale.json',
])
files.extend([
Path(o['matrix']),
Path(o['image']),
Path(o['position']),
Path(o['scale']),
])
elif study_type == StudyType.DSP.value:
for o in batch_info:
file_names.extend([
f'{o["name"]}matrix.xlsx',
f'{o["name"]}image.tiff',
])
files.extend([
Path(o['matrix']),
Path(o['image']),
])
output_dir = self.upload_chunk(
file_names,
files,
chunk_size
)
data = self._submit_study(
group_id,
study_id,
name,
authors,
abstract,
species,
study_type,
)
data['study_path'] = output_dir
data['batch_info'] = [o['name'] for o in batch_info]
submission_status = self.post_request(
api_route='api/v1/submit_study_from_local',
data=data
)
task_id = common.parse_submission_status(submission_status)
if task_id is None:
return False
return self.get_submission_log(
group_id=group_id,
task_id=task_id
)