HABSlib: Python Library for EEG Analysis and Biomarker Evaluation
HABSlib is a versatile Python library designed to facilitate interaction with the BrainOS HABS API for comprehensive EEG data analysis and biomarker evaluation. Developed to support neuroscientific research and clinical applications, HABSlib simplifies the process of fetching, processing, and analyzing EEG data through a user-friendly interface.
Key Features:
- API Integration: Connects with the BrainOS HABS API, allowing users to access EEG data and related services effortlessly.
- Data Management: Provides a robust interface for managing EEG datasets, including storage on the HABS servers.
- Biomarker Evaluation: Enables the analysis of EEG biomarkers, essential for diagnosing and monitoring neurological conditions.
- Customizable Pipelines: Users can create custom analysis pipelines tailored to specific research needs, ensuring flexibility and adaptability in various use cases.
Sessions
The communications between the user and HABS BrainOS is based on a RESTful API (see doc) and structured into sessions.
A Session with the HABS BrainOS iinitiates with an handshake during which encryption keys are exchanged for the security of any following communication between the user and the server.
Simple sessions
There are two general types of session: real-time and off-line.
In setting a either a real-time or off-line session, the user provides the session metadata, such as their user_id, session_date (both required), and additional notes depending on the nature of recordings.
Then, in a real-time session, the user specifies the type of EEG DEVICE ('board') used, the duration of the EEG recording, and the frequency of server update.
In an off-line session, the user specifies a file name from which the data should be read. The HABSlib reads the file (it currently supports EDF file type only, but it's growing) and sends it to the server.
In these simple types of session, after the real-time or offline uploading, the data can be selected via the session_id for further processing.
Piped sessions
There is another type of session, called piped session, during which the EEG data is processed by the server as soon as it is received, and the results are sent back to the user as soon as they are processed.
This type of session is real-time only. During its setup, users can choose (from a growing number of available functions) the type of processing they wish to be performed on the data.
1###################################################### 2# INTRO 3 4r""" 5# HABSlib: Python Library for EEG Analysis and Biomarker Evaluation 6 7HABSlib is a versatile Python library designed to facilitate interaction with the BrainOS HABS API for comprehensive EEG data analysis and biomarker evaluation. 8Developed to support neuroscientific research and clinical applications, HABSlib simplifies the process of fetching, processing, and analyzing EEG data through a user-friendly interface. 9 10Key Features: 11- **API Integration**: Connects with the BrainOS HABS API, allowing users to access EEG data and related services effortlessly. 12- **Data Management**: Provides a robust interface for managing EEG datasets, including storage on the HABS servers. 13- **Biomarker Evaluation**: Enables the analysis of EEG biomarkers, essential for diagnosing and monitoring neurological conditions. 14- **Customizable Pipelines**: Users can create custom analysis pipelines tailored to specific research needs, ensuring flexibility and adaptability in various use cases. 15 16## Sessions 17 18The communications between the user and HABS BrainOS is based on a RESTful API (see doc) and structured into sessions. 19 20A *Session* with the HABS BrainOS iinitiates with an handshake during which encryption keys are exchanged for the security of any following communication between the user and the server. 21 22### Simple sessions 23 24There are two general types of session: *real-time* and *off-line*. 25 26In setting a either a real-time or off-line session, the user provides the session metadata, such as their user_id, session_date (both required), and additional notes depending on the nature of recordings. 27 28Then, in a real-time session, the user specifies the type of EEG DEVICE ('board') used, the duration of the EEG recording, and the frequency of server update. 29 30In an off-line session, the user specifies a file name from which the data should be read. The HABSlib reads the file (it currently supports EDF file type only, but it's growing) and sends it to the server. 31 32In these simple types of session, after the real-time or offline uploading, the data can be selected via the session_id for further processing. 33 34### Piped sessions 35 36There is another type of session, called *piped* session, during which the EEG data is processed by the server as soon as it is received, and the results are sent back to the user as soon as they are processed. 37This type of session is real-time only. During its setup, users can choose (from a growing number of available functions) the type of processing they wish to be performed on the data. 38""" 39 40import sys 41import os 42import base64 43import requests 44import json 45import jsonschema 46from jsonschema import validate 47from jsonschema import exceptions 48# from bson import json_util 49 50import numpy as np 51 52import time 53from datetime import datetime 54import uuid 55import asyncio 56 57from . import BASE_URL, VERSION 58from . import BoardManager 59 60from cryptography.hazmat.primitives import hashes, serialization 61from cryptography.hazmat.backends import default_backend 62from . import store_public_key, load_public_key, generate_aes_key, encrypt_aes_key_with_rsa, encrypt_message, decrypt_message 63 64from pyedflib import highlevel 65 66 67###################################################### 68# validate the metadata against a specified schema 69def validate_metadata(metadata, schema_name, schemafile='metadata.json'): 70 """ 71 Validate metadata against a given JSON schema. 72 73 Args: 74 **metadata** (*dict*): The metadata to be validated. 75 **schema_name** (*str*): The name of the schema to validate against. HABSlib currently supports the validation of Session metadata and User data. 76 **schemafile** (*str*, optional): The path to the JSON file containing the schemas. Defaults to 'metadata.json'. 77 78 Returns: 79 *bool*: True if validation is successful, False otherwise. 80 81 Raises: 82 **FileNotFoundError**: If the schema file does not exist. 83 **json.JSONDecodeError**: If there is an error decoding the JSON schema file. 84 **exceptions.ValidationError**: If the metadata does not conform to the schema. 85 **Exception**: For any other errors that occur during validation. 86 87 Example: 88 ``` 89 metadata = {"name": "example", "type": "data"} 90 schema_name = "example_schema" 91 is_valid = validate_metadata(metadata, schema_name) 92 if is_valid: 93 print("Metadata is valid.") 94 else: 95 print("Metadata is invalid.") 96 ``` 97 """ 98 99 try: 100 with open(os.path.join(os.path.dirname(__file__), schemafile), 'r') as file: 101 content = file.read() 102 schemas = json.loads(content) 103 schema = schemas[schema_name] 104 validate(instance=metadata, schema=schema) #, format_checker=FormatChecker()) 105 print("Metadata validation successful!") 106 return True 107 108 except json.JSONDecodeError as e: 109 print("Failed to decode JSON:", e) 110 return False 111 112 except exceptions.ValidationError as e: 113 print("Validation error:", e) 114 return False 115 116 except FileNotFoundError: 117 print(f"No such file: {schemafile}") 118 return False 119 120 except Exception as e: 121 print("A general error occurred:", e) 122 return False 123 124 125def convert_datetime_in_dict(data): 126 """ 127 Recursively converts all datetime objects in a dictionary to strings in ISO format. 128 129 Args: 130 **data** (*dict*): The dictionary containing the data. 131 132 Returns: 133 *dict*: The dictionary with datetime objects converted to strings. 134 """ 135 for key, value in data.items(): 136 if isinstance(value, datetime): 137 data[key] = value.isoformat() 138 elif isinstance(value, dict): 139 data[key] = convert_datetime_in_dict(value) 140 return data 141 142 143###################################################### 144def handshake(base_url): 145 """ 146 Perform a handshake with the server to exchange encryption keys for the current session. 147 148 This function performs the following steps: 149 0. Performs login to the HABS server. 150 1. Sends a GET request to the server to initiate an RSA handshake. 151 2. Retrieves the server's public RSA key from the response. 152 3. Generates a local AES key and stores it in the environment. 153 4. Encrypts the AES key with the server's RSA key. 154 5. Sends the encrypted AES key to the server to complete the AES handshake. 155 156 Args: 157 **base_url** (*str*): The base URL of the server's API. 158 159 Returns: 160 *bool*: True if the handshake is successful, None otherwise. 161 162 Raises: 163 **requests.RequestException**: If a request to the server fails. 164 165 Example: 166 ``` 167 success = handshake("https://example.com") 168 if success: 169 print("Handshake completed successfully.") 170 else: 171 print("Handshake failed.") 172 ``` 173 """ 174 global BASE_URL 175 BASE_URL = base_url 176 url = f"{BASE_URL}/api/{VERSION}/handshake_rsa" 177 response = requests.get(url) 178 179 if response.status_code == 200: 180 print("Handshake (RSA) successful.") 181 api_public_key_pem = response.json().get('api_public_key') 182 api_public_key = serialization.load_pem_public_key( 183 api_public_key_pem.encode(), 184 backend=default_backend() 185 ) 186 os.environ['API_PUBLIC_KEY'] = api_public_key_pem 187 188 # Then we generate and store the AES key 189 aes_key = generate_aes_key() 190 # print("aes_key", aes_key) 191 os.environ['AES_KEY'] = base64.b64encode( aes_key ).decode('utf-8') 192 193 encrypted_aes_key = encrypt_aes_key_with_rsa(aes_key, api_public_key) 194 encrypted_aes_key_b64 = base64.b64encode(encrypted_aes_key).decode('utf-8') 195 # print("encrypted_aes_key_b64",encrypted_aes_key_b64) 196 aes_key_payload = { 197 "encrypted_aes_key": encrypted_aes_key_b64 198 } 199 response = requests.post(f"{BASE_URL}/api/{VERSION}/handshake_aes", json=aes_key_payload) 200 201 if response.status_code == 200: 202 print("Handshake (AES) successful.") 203 return True 204 else: 205 print("Handshake (AES) failed:", response.text) 206 return None 207 else: 208 print("Handshake (RSA) failed:", response.text) 209 return None 210 211 212 213###################################################### 214def set_user(first_name=None, last_name=None, email=None, age=None, weight=None, gender=None): 215 """ 216 Creates a user by sending user data to the server. 217 218 This function performs the following steps: 219 1. Constructs the user data dictionary. 220 2. Validates the user data against the "userSchema". 221 3. Encrypts the user data using the stored AES key. 222 4. Sends the encrypted user data to the server. 223 5. Handles the server's response. 224 225 Args: 226 **first_name** (*str*, optional): The user's first name. 227 **last_name** (*str*, optional): The user's last name. 228 **email** (*str*, optional): The user's email address. 229 **age** (*int*, optional): The user's age. 230 **weight** (*float*, optional): The user's weight. 231 **gender** (*str*, optional): The user's gender. 232 233 Returns: 234 *str*: The user ID if the user is successfully created/retrieved, None otherwise. 235 236 Example: 237 ``` 238 user_id = set_user(first_name="John", last_name="Doe", email="john.doe@example.com", age=30, weight=70.5, gender="X") 239 if user_id: 240 print(f"User created/retrieved with ID: {user_id}") 241 else: 242 print("User creation failed.") 243 ``` 244 """ 245 url = f"{BASE_URL}/api/{VERSION}/users" 246 user_data = { 247 "first_name": first_name, 248 "last_name": last_name, 249 "email": email, 250 "age": age, 251 "weight": weight, 252 "gender": gender 253 } 254 if validate_metadata(user_data, "userSchema", ): 255 _user = { 256 "user_data": user_data 257 } 258 _user = json.dumps(_user).encode('utf-8') 259 aes_key_b64 = os.environ.get('AES_KEY') 260 aes_key_bytes = base64.b64decode(aes_key_b64) 261 response = requests.post( 262 url, 263 data=encrypt_message(_user, aes_key_bytes), 264 headers={'Content-Type': 'application/octet-stream'} 265 ) 266 267 if response.status_code == 200: 268 print("User successfully created/retrieved.") 269 user_id = response.json().get('user_id') 270 return user_id 271 else: 272 print("User creation failed:", response.text) 273 return None 274 else: 275 print("User creation failed.") 276 277 278###################################################### 279def search_user_by_mail(email): 280 """ 281 Search for a user by email. 282 283 This function sends a GET request to the server to search for a user by the provided email address. 284 285 Args: 286 **email** (*str*): The email address of the user to search for. 287 288 Returns: 289 *str*: The user ID if the user is found, None otherwise. 290 291 Example: 292 ``` 293 user_id = search_user_by_mail("john.doe@example.com") 294 if user_id: 295 print(f"User found with ID: {user_id}") 296 else: 297 print("User not found.") 298 ``` 299 """ 300 url = f"{BASE_URL}/api/{VERSION}/users?email={email}" 301 302 response = requests.get(url) 303 304 if response.status_code == 200: 305 user_id = response.json().get('user_id') 306 print("User found:", user_id) 307 return user_id 308 else: 309 print("User not found.", response.text) 310 return None 311 312 313###################################################### 314def get_user_by_id(user_id): 315 """ 316 Retrieve user data by user ID. 317 318 This function sends a GET request to the server to retrieve user data for the specified user ID. 319 The response data is decrypted using AES before returning the user data. 320 321 Args: 322 **user_id** (*str*): The unique identifier of the user to retrieve. 323 324 Returns: 325 *dict*: The user data if the user is found, None otherwise. 326 327 Example: 328 ``` 329 user_data = get_user_by_id("1234567890") 330 if user_data: 331 print(f"User data: {user_data}") 332 else: 333 print("User not found.") 334 ``` 335 """ 336 url = f"{BASE_URL}/api/{VERSION}/users/{user_id}" 337 338 response = requests.get(url) 339 340 if response.status_code == 200: 341 print("User found.") 342 encrypted_data = response.content 343 aes_key_b64 = os.environ.get('AES_KEY') 344 aes_key_bytes = base64.b64decode(aes_key_b64) 345 decrypted_json_string = decrypt_message(encrypted_data, aes_key_bytes) 346 user_data = json.loads(decrypted_json_string)['user_data'] 347 return user_data 348 else: 349 print("User not found:", response.text) 350 return None 351 352 353###################################################### 354def set_session(metadata): 355 """ 356 Create a new simple session. 357 358 This function sends a POST request to the server to create a new simple session using the provided metadata. 359 The metadata is encrypted using AES before being sent to the server. 360 361 Args: 362 **metadata** (*dict*): A dictionary containing the session metadata. The only required metadata for the simple session are the user_id and a date. 363 364 Returns: 365 *str*: The unique identifier of the created session if successful, None otherwise. 366 367 Example: 368 ``` 369 session_metadata = { 370 "user_id": "1076203852085", 371 "session_date": "2024-05-30T12:00:00Z" 372 } 373 session_id = set_session(session_metadata) 374 if session_id: 375 print(f"Session created with ID: {session_id}") 376 else: 377 print("Failed to create session.") 378 ``` 379 """ 380 url = f"{BASE_URL}/api/{VERSION}/sessions" 381 _session = metadata 382 _session = json.dumps(_session).encode('utf-8') 383 aes_key_b64 = os.environ.get('AES_KEY') 384 aes_key_bytes = base64.b64decode(aes_key_b64) 385 response = requests.post( 386 url, 387 data=encrypt_message(_session, aes_key_bytes), 388 headers={'Content-Type': 'application/octet-stream'} 389 ) 390 391 if response.status_code == 200: 392 print("Session successfully created.") 393 # Extract the unique identifier for the uploaded data 394 session_id = response.json().get('session_id') 395 396 # print(session_id) 397 return session_id 398 else: 399 print("Session failed:", response.text) 400 return None 401 402 403###################################################### 404def get_data_by_id(data_id): 405 """ 406 Retrieve raw data by its unique identifier from the server. 407 408 This function sends a GET request to fetch raw data associated with a specific identifier. It 409 assumes that the data, if retrieved successfully, does not require decryption and is directly accessible. 410 411 Args: 412 **data_id** (*str*): The unique identifier for the data to be retrieved. 413 414 Returns: 415 **dict**: The raw data if retrieval is successful, None otherwise. 416 417 Example: 418 ``` 419 data_id = "1234" 420 raw_data = get_data_by_id(data_id) 421 ... use the data 422 ``` 423 """ 424 get_url = f"{BASE_URL}/api/{VERSION}/rawdata/{data_id}" 425 426 response = requests.get(get_url) 427 428 if response.status_code == 200: 429 print("Retrieved data successfully.") 430 # decrypt 431 return response.json().get('rawData') 432 else: 433 print("Failed to retrieve data:", response.text) 434 435 436###################################################### 437def get_data_by_session(session_id): 438 """ 439 Retrieve raw data associated with a specific session identifier from the server. 440 441 This function sends a GET request to fetch all raw data linked to the given session ID. The data 442 is returned in its raw form assuming it does not require decryption for usage. 443 444 Args: 445 **session_id** (*str*): The session identifier whose associated data is to be retrieved. 446 447 Returns: 448 *dict*: The raw data linked to the session if retrieval is successful, None otherwise. 449 450 Example: 451 ``` 452 session_id = "abcd1234" 453 session_data = get_data_by_session(session_id) 454 if session_data: 455 print("Data retrieved:", session_data) 456 else: 457 print("Failed to retrieve data.") 458 ``` 459 """ 460 get_url = f"{BASE_URL}/api/{VERSION}/sessions/{session_id}/rawdata" 461 462 response = requests.get(get_url) 463 464 if response.status_code == 200: 465 print("Retrieved data successfully.") 466 # decrypt 467 return response.json().get('data') 468 else: 469 print("Failed to retrieve data:", response.text) 470 471 472 473###################################################### 474def get_data_ids_by_session(session_id): 475 """ 476 Retrieve a list of data IDs associated with a specific session from the server. 477 478 This function sends a GET request to fetch the IDs of all data entries linked to a specified session ID. 479 The IDs are returned as a list. The function assumes the data does not require decryption for usage. 480 481 Args: 482 **session_id** (*str*): The session identifier for which data IDs are to be retrieved. 483 484 Returns: 485 *list*: A list of data IDs if retrieval is successful, None otherwise. 486 487 Example: 488 ``` 489 session_id = "abcd1234" 490 data_ids = get_data_ids_by_session(session_id) 491 if data_ids: 492 print("Data IDs retrieved:", data_ids) 493 else: 494 print("Failed to retrieve data IDs.") 495 ``` 496 """ 497 get_url = f"{BASE_URL}/api/{VERSION}/sessions/{session_id}/ids" 498 499 response = requests.get(get_url) 500 501 if response.status_code == 200: 502 print("Retrieved ids successfully.") 503 # decrypt 504 return response.json().get('ids') 505 else: 506 print("Failed to retrieve ids:", response.text) 507 508 509 510###################################################### 511def upload_data(metadata, timestamps, data, ppg_red, ppg_ir): 512 """ 513 Uploads EEG and PPG data to the server along with associated metadata. 514 515 This function compiles different types of physiological data along with metadata into a single dictionary, 516 encrypts the data, and then uploads it via a POST request. Upon successful upload, the server returns a 517 unique identifier for the data which can then be used for future queries or operations. 518 519 Args: 520 **metadata** (*dict*): Information about the data such as subject details and session parameters. 521 **timestamps** (*list*): List of timestamps correlating with each data point. 522 **data** (*list*): EEG data points. 523 **ppg_red** (*list*): Red photoplethysmogram data points. 524 **ppg_ir** (*list*): Infrared photoplethysmogram data points. 525 526 Returns: 527 *tuple*: A tuple containing the data ID of the uploaded data if successful, and None otherwise. 528 529 Example: 530 ``` 531 metadata = {"session_id": "1234", "subject_id": "001"} 532 timestamps = [1597709184, 1597709185] 533 data = [0.1, 0.2] 534 ppg_red = [123, 124] 535 ppg_ir = [125, 126] 536 data_id, error = upload_data(metadata, timestamps, data, ppg_red, ppg_ir) 537 if data_id: 538 print("Data uploaded successfully. Data ID:", data_id) 539 else: 540 print("Upload failed with error:", error) 541 ``` 542 """ 543 url = f"{BASE_URL}/api/{VERSION}/rawdata" 544 _data = { 545 "metadata": metadata, 546 "timestamps": timestamps, 547 "data": data, 548 "ppg_red": ppg_red, 549 "ppg_ir": ppg_ir 550 } 551 _data = json.dumps(_data).encode('utf-8') 552 553 # response = requests.post(url, json=_data) 554 aes_key_b64 = os.environ.get('AES_KEY') 555 aes_key_bytes = base64.b64decode(aes_key_b64) 556 response = requests.post( 557 url, 558 data=encrypt_message(_data, aes_key_bytes), 559 headers={'Content-Type': 'application/octet-stream'} 560 ) 561 562 if response.status_code == 200: 563 print('.', end='', flush=True) 564 # Extract the unique identifier for the uploaded data 565 data_id = response.json().get('data_id') 566 return data_id, None 567 else: 568 print("Upload failed:", response.text) 569 return None 570 571 572 573###################################################### 574def acquire_send_raw(user_id, date, board, stream_duration, buffer_duration): 575 """ 576 Asynchronously acquires raw data from a specific EEG board and sends it to the server. 577 578 This function connects to an EEG board, initiates a data acquisition session, and sends the collected data 579 to the server in real-time or near real-time. It ensures that all the data handled during the session 580 is associated with a unique session ID and metadata that includes user and session details. The function 581 will validate the session metadata before proceeding with data acquisition and sending. 582 583 Args: 584 **user_id** (*str*): The unique identifier of the user for whom the data is being collected. 585 **date** (*str*): The date of the session, used for metadata purposes. 586 **board** (*int*): Identifier for the EEG board from which data will be acquired. 587 **stream_duration** (*int*): Duration in seconds for which data will be streamed from the board. 588 **buffer_duration** (*int*): Time in seconds for how often the data is buffered and sent. 589 590 Returns: 591 *str* or *bool*: The session ID if the operation is successful; False otherwise. 592 593 Raises: 594 **ConnectionError**: If the board connection fails. 595 **ValidationError**: If the metadata does not comply with the required schema. 596 597 Example: 598 ``` 599 session = acquire_send_raw('user123', '2021-06-01', 'MUSE_S', 300, 10) 600 if session: 601 print(f"Session successfully started with ID: {session}") 602 else: 603 print("Failed to start session") 604 ``` 605 """ 606 session_id = asyncio.run( _acquire_send_raw(user_id, date, board, stream_duration, buffer_duration) ) 607 return session_id 608 609async def _acquire_send_raw(user_id, date, board, stream_duration, buffer_duration): 610 # get board 611 board_manager = BoardManager(enable_logger=False, board_id=board) 612 board_manager.connect() 613 # set session for the data 614 # We set a session id for the current interaction with the API (even if we fail to get the board, it will be important to store the failure) 615 session_metadata = { 616 "user_id": user_id, # add user to the session for reference 617 "session_date": date 618 } 619 if validate_metadata(session_metadata, "sessionSchema"): 620 session_id = set_session(metadata={**session_metadata, **board_manager.metadata}) 621 board_manager.metadata['session_id'] = session_id # add session to the data for reference 622 623 # stream_duration sec, buffer_duration sec 624 await board_manager.data_acquisition_loop( 625 stream_duration=stream_duration, 626 buffer_duration=buffer_duration, 627 service=upload_data 628 ) 629 630 return session_id 631 else: 632 return False 633 634 635 636###################################################### 637def send_file(user_id, date, edf_file, ch_nrs=None, ch_names=None): 638 """ 639 Uploads EEG data from a file to the server along with associated metadata. 640 641 This function compiles EEG data from an [EDF file](https://www.edfplus.info/downloads/index.html) along with metadata into a single dictionary, 642 encrypts the data, and then uploads it via a POST request. Upon successful upload, the server returns a 643 unique identifier for the session which can then be used for future queries or operations. 644 645 Args: 646 **user_id** (*str*): The unique identifier of the user for whom the data is being collected. 647 **metadata** (*dict*): Information about the data such as subject details and session parameters. 648 **date** (*str*): The date of the session, used for metadata purposes. 649 **edf_file** (*str*): name of an EDF file. 650 **ch_nrs** (*list of int*, optional): The indices of the channels to read. The default is None. 651 **ch_names** (*list of str*, optional): The names of channels to read. The default is None. 652 653 Returns: 654 *tuple*: A tuple containing the session ID of the uploaded data if successful, and None otherwise. 655 656 Example: 657 ``` 658 session = send_file('user123', '2021-06-01', 'nameoffile.edf') 659 if session: 660 print(f"Session successfully started with ID: {session}") 661 else: 662 print("Failed to start session") 663 ``` 664 """ 665 666 try: 667 signals, signal_headers, header = highlevel.read_edf(edf_file, ch_nrs, ch_names) 668 669 max_time = signals.shape[1] / signal_headers[0]['sample_frequency'] 670 timestamps = np.linspace(header['startdate'].timestamp(), max_time, signals.shape[1]) 671 672 session_metadata = { 673 "user_id": user_id, # add user to the session for reference 674 "session_date": header['startdate'].strftime("%m/%d/%Y, %H:%M:%S") 675 } 676 if validate_metadata(session_metadata, "sessionSchema"): 677 session_id = set_session(metadata={**session_metadata}) 678 metadata = {'session_id':session_id, **session_metadata, **convert_datetime_in_dict(header), **convert_datetime_in_dict(signal_headers[0])} 679 680 chunks = ((signals.size * signals.itemsize)//300000)+1 681 timestamps_chunks = np.array_split(timestamps, chunks) 682 signals_chunks = np.array_split(signals, chunks, axis=1) 683 json_data = json.dumps(signals_chunks[0].tolist()) 684 size_in_bytes = sys.getsizeof(json_data) 685 print("%d total bytes will be sent into %d chunks of %d bytes" % (signals.size * signals.itemsize, chunks, size_in_bytes)) 686 687 for timestamps_chunk,signals_chunk in zip(timestamps_chunks, signals_chunks): 688 upload_data(metadata, timestamps_chunk.tolist(), signals_chunk.tolist(), [], []) 689 690 return session_id 691 else: 692 return False 693 except Exception as e: 694 print("A general error occurred:", e) 695 return False 696 697 698 699###################################################### 700###################################################### 701def set_pipe(metadata, pipeline, params): 702 """ 703 Configures and initiates a data processing pipeline for a session on the server. 704 705 This function sends metadata and processing parameters to a specified pipeline endpoint 706 to create a data processing session. It encrypts the session data before sending to ensure 707 security. The function checks the server response to confirm the session creation. 708 709 Args: 710 **metadata** (*dict*): A dictionary containing metadata about the session, typically including 711 details such as user ID and session date. 712 **pipeline** (*str*): The identifier for the processing pipeline to be used. 713 **params** (*dict*): Parameters specific to the processing pipeline, detailing how data should 714 be processed. 715 716 Returns: 717 *str* or *None*: The session ID if the session is successfully created, or None if the operation fails. 718 719 Raises: 720 **requests.exceptions.RequestException**: An error from the Requests library when an HTTP request fails. 721 **KeyError**: If necessary keys are missing in the environment variables. 722 723 Example: 724 ``` 725 session_metadata = {"user_id": "123", "session_date": "2024-06-03"} 726 processing_params = {"filter_type": "lowpass", "cutoff_freq": 30} 727 session_id = set_pipe(session_metadata, 'eeg_smoothing', processing_params) 728 if session_id: 729 print(f"Pipeline session created with ID: {session_id}") 730 else: 731 print("Failed to create pipeline session") 732 ``` 733 """ 734 url = f"{BASE_URL}/api/{VERSION}/sessions/pipe/{pipeline}" 735 _session = { 736 "metadata": metadata, 737 "processing_params": params, 738 } 739 _session = json.dumps(_session).encode('utf-8') 740 aes_key_b64 = os.environ.get('AES_KEY') 741 aes_key_bytes = base64.b64decode(aes_key_b64) 742 response = requests.post( 743 url, 744 data=encrypt_message(_session, aes_key_bytes), 745 headers={'Content-Type': 'application/octet-stream'} 746 ) 747 748 if response.status_code == 200: 749 print("Session successfully created.") 750 # Extract the unique identifier for the uploaded data 751 session_id = response.json().get('session_id') 752 # print(session_id) 753 return session_id 754 else: 755 print("Session failed:", response.text) 756 return None 757 758 759###################################################### 760def upload_pipedata(metadata, timestamps, data, ppg_red, ppg_ir): 761 """ 762 Uploads processed data to a specific session on the server. 763 764 This function is responsible for uploading various data streams associated with a session, including 765 timestamps and physiological measurements such as PPG (Photoplethysmogram). The data is encrypted before 766 sending to ensure confidentiality and integrity. 767 768 Args: 769 **metadata** (*dict*): Contains session-related metadata including the session ID. 770 **timestamps** (*list*): A list of timestamps corresponding to each data point. 771 **data** (*list*): The main data collected, e.g., EEG readings. 772 **ppg_red** (*list*): Red channel data from a PPG sensor. 773 **ppg_ir** (*list*): Infrared channel data from a PPG sensor. 774 775 Returns: 776 *tuple*: A tuple containing the data ID if the upload is successful and the processed data, or None if the upload fails. 777 778 Raises: 779 **requests.exceptions.RequestException: An error from the Requests library when an HTTP request fails. 780 **KeyError**: If necessary keys are missing in the environment variables. 781 782 Example: 783 ``` 784 session_metadata = {"session_id": "12345"} 785 timestamps = [1597709165, 1597709166, ...] 786 data = [0.1, 0.2, ...] 787 ppg_red = [12, 15, ...] 788 ppg_ir = [20, 22, ...] 789 data_id, processed_data = upload_pipedata(session_metadata, timestamps, data, ppg_red, ppg_ir) 790 if data_id: 791 print(f"Data uploaded successfully with ID: {data_id}") 792 else: 793 print("Failed to upload data") 794 ``` 795 """ 796 url = f"{BASE_URL}/api/{VERSION}/pipedata/{metadata['session_id']}" # the metadata contain session_id to consistently pass it with each upload 797 798 _data = { 799 "metadata": metadata, 800 "timestamps": timestamps, 801 "data": data, 802 "ppg_red": ppg_red, 803 "ppg_ir": ppg_ir 804 } 805 _data = json.dumps(_data).encode('utf-8') 806 aes_key_b64 = os.environ.get('AES_KEY') 807 aes_key_bytes = base64.b64decode(aes_key_b64) 808 response = requests.post( 809 url, 810 data=encrypt_message(_data, aes_key_bytes), 811 headers={'Content-Type': 'application/octet-stream'} 812 ) 813 814 if response.status_code == 200: 815 print('.', end='', flush=True) 816 # Extract the unique identifier for the uploaded data 817 data_id = response.json().get('data_id') 818 # Retrieve the processed data 819 data = response.json().get('pipeData') 820 return data_id, data 821 else: 822 print("Upload failed:", response.text) 823 return None 824 825 826###################################################### 827def acquire_send_pipe(pipeline, params, user_id, date, board, stream_duration, buffer_duration, callback=None): 828 """ 829 Acquires data from a board, processes it according to a specified pipeline, and sends it to a server. 830 This function handles setting up a session for data acquisition and processing, connects to a board, 831 and manages the data flow from acquisition through processing to uploading. It uses an asynchronous loop 832 to handle the operations efficiently, suitable for real-time data processing scenarios. 833 834 Args: 835 **pipeline** (*str*): Name of the processing pipeline to use. 836 **params** (*dict*): Parameters for the pipeline processing. 837 **user_id** (*str*): The user ID to which the session will be associated. 838 **date** (*str*): Date of the session for tracking purposes. 839 **board** (*int*): Identifier for the hardware board to use for data acquisition. 840 **stream_duration** (*int*): Duration in seconds to stream data from the board. 841 **buffer_duration** (*int*): Duration in seconds to buffer data before processing. 842 **callback** (*function*): Optional callback function to execute after data is sent. 843 844 Returns: 845 *str* or *bool*: The session ID if successful, False otherwise. 846 847 """ 848 session_id = asyncio.run( 849 _acquire_send_pipe(pipeline, params, user_id, date, board, stream_duration, buffer_duration, callback) 850 ) 851 return session_id 852 853async def _acquire_send_pipe(pipeline, params, user_id, date, board, stream_duration, buffer_duration, callback=None): 854 # get board 855 board_manager = BoardManager(enable_logger=False, board_id=board) 856 board_manager.connect() 857 858 # set session for the data 859 # We set a session id for the current interaction with the API (even if we fail to get the board, it will be important to store the failure) 860 session_metadata = { 861 "user_id": user_id, # add user to the session for reference 862 "session_date": date 863 } 864 if validate_metadata(session_metadata, "sessionSchema"): 865 session_id = set_pipe(metadata={**session_metadata, **board_manager.metadata}, pipeline=pipeline, params=params) 866 board_manager.metadata['session_id'] = session_id # add session to the data for reference 867 868 # stream_duration sec, buffer_duration sec 869 await board_manager.data_acquisition_loop( 870 stream_duration=stream_duration, 871 buffer_duration=buffer_duration, 872 service=upload_pipedata, 873 callback=callback 874 ) 875 876 return session_id 877 else: 878 return False 879 880 881###################################################### 882def train(session_id, params): 883 """ 884 Sends a request to the server to train a machine learning algorithm on the data from a specified session. 885 886 Args: 887 **session_id** (*str*): The unique identifier of the session containing the data to be used for training. 888 **params** (*dict*): The parameters for the training process. 889 890 Returns: 891 *str* or *None*: The task ID if the request is successful, None otherwise. 892 893 This function sends the training parameters and session ID to the server, which initiates the training process. 894 The response includes a task ID that can be used for future interactions related to the training task. 895 896 Example: 897 ``` 898 train("session_12345", {"param1": "value1", "param2": "value2"}) 899 ``` 900 """ 901 url = f"{BASE_URL}/api/{VERSION}/train/{session_id}" 902 _params = { 903 "params": params, 904 } 905 _params = json.dumps(_params).encode('utf-8') 906 aes_key_b64 = os.environ.get('AES_KEY') 907 aes_key_bytes = base64.b64decode(aes_key_b64) 908 response = requests.post( 909 url, 910 data=encrypt_message(_params, aes_key_bytes), 911 headers={'Content-Type': 'application/octet-stream'} 912 ) 913 914 if response.status_code == 200: 915 task_id = response.json().get('task_id') 916 print("Published. For future interactions, use task_id:",task_id) 917 return task_id 918 else: 919 print("Publish failed:", response.text) 920 return None 921 922 923###################################################### 924def infer(data_id, params): 925 """ 926 Sends a request to the server to perform machine learning inference based on a previously trained model, given the data ID. 927 928 Args: 929 **data_id** (*str*): The unique identifier of the data to be used for inference. 930 **params** (*dict*): The parameters for the inference process. 931 932 Returns: 933 *str* or *None*: The task ID if the request is successful, None otherwise. 934 935 This function sends the inference parameters and data ID to the server, which initiates the inference process. 936 The response includes a task ID that can be used for future interactions related to the inference task. 937 938 Example: 939 ``` 940 infer("data_12345", {"param1": "value1", "param2": "value2"}) 941 ``` 942 """ 943 url = f"{BASE_URL}/api/{VERSION}/infer/{data_id}" 944 _params = { 945 "params": params, 946 } 947 _params = json.dumps(_params).encode('utf-8') 948 # response = requests.post(url, json=_params) 949 aes_key_b64 = os.environ.get('AES_KEY') 950 aes_key_bytes = base64.b64decode(aes_key_b64) 951 response = requests.post( 952 url, 953 data=encrypt_message(_params, aes_key_bytes), 954 headers={'Content-Type': 'application/octet-stream'} 955 ) 956 957 if response.status_code == 200: 958 task_id = response.json().get('task_id') 959 print("Published. For future interactions, use task_id:",task_id) 960 return task_id 961 else: 962 print("Publish failed:", response.text) 963 return None 964 965 966 967############################ 968# BrainOS - HABSlib - TODO # 969############################ 970# 971# - Encryption: 972# - Client side: 973# 2. Client starts handshake (GET) 974# 4. Client receives the public RSA and stores it. 975# 5. Client encrypts the AES key using the server's RSA public key and sends it to the server. 976# 7. All subsequent communications use the AES key for encryption and decryption, ensuring fast and secure data exchange. 977# 978# - Unit Test: 979# - the file client.py is the base for testing habslib 980# - Low level unit test: for single-use function in this file, and BoradManager 981# - High level unit test: fixtures for creating the session, sending data and retrieving 982# - Tests are on demand, not necessarily for every commit. But for each major change yes. 983# 984# - Code coverage 985# 986# - Reviewer: 987# Human 988# Automated (unit test passing GitLab Ops)
70def validate_metadata(metadata, schema_name, schemafile='metadata.json'): 71 """ 72 Validate metadata against a given JSON schema. 73 74 Args: 75 **metadata** (*dict*): The metadata to be validated. 76 **schema_name** (*str*): The name of the schema to validate against. HABSlib currently supports the validation of Session metadata and User data. 77 **schemafile** (*str*, optional): The path to the JSON file containing the schemas. Defaults to 'metadata.json'. 78 79 Returns: 80 *bool*: True if validation is successful, False otherwise. 81 82 Raises: 83 **FileNotFoundError**: If the schema file does not exist. 84 **json.JSONDecodeError**: If there is an error decoding the JSON schema file. 85 **exceptions.ValidationError**: If the metadata does not conform to the schema. 86 **Exception**: For any other errors that occur during validation. 87 88 Example: 89 ``` 90 metadata = {"name": "example", "type": "data"} 91 schema_name = "example_schema" 92 is_valid = validate_metadata(metadata, schema_name) 93 if is_valid: 94 print("Metadata is valid.") 95 else: 96 print("Metadata is invalid.") 97 ``` 98 """ 99 100 try: 101 with open(os.path.join(os.path.dirname(__file__), schemafile), 'r') as file: 102 content = file.read() 103 schemas = json.loads(content) 104 schema = schemas[schema_name] 105 validate(instance=metadata, schema=schema) #, format_checker=FormatChecker()) 106 print("Metadata validation successful!") 107 return True 108 109 except json.JSONDecodeError as e: 110 print("Failed to decode JSON:", e) 111 return False 112 113 except exceptions.ValidationError as e: 114 print("Validation error:", e) 115 return False 116 117 except FileNotFoundError: 118 print(f"No such file: {schemafile}") 119 return False 120 121 except Exception as e: 122 print("A general error occurred:", e) 123 return False
Validate metadata against a given JSON schema.
Args:
metadata (dict): The metadata to be validated.
schema_name (str): The name of the schema to validate against. HABSlib currently supports the validation of Session metadata and User data.
schemafile (str, optional): The path to the JSON file containing the schemas. Defaults to 'metadata.json'.
Returns:
bool: True if validation is successful, False otherwise.
Raises:
FileNotFoundError: If the schema file does not exist.
json.JSONDecodeError: If there is an error decoding the JSON schema file.
exceptions.ValidationError: If the metadata does not conform to the schema.
Exception: For any other errors that occur during validation.
Example:
metadata = {"name": "example", "type": "data"}
schema_name = "example_schema"
is_valid = validate_metadata(metadata, schema_name)
if is_valid:
print("Metadata is valid.")
else:
print("Metadata is invalid.")
126def convert_datetime_in_dict(data): 127 """ 128 Recursively converts all datetime objects in a dictionary to strings in ISO format. 129 130 Args: 131 **data** (*dict*): The dictionary containing the data. 132 133 Returns: 134 *dict*: The dictionary with datetime objects converted to strings. 135 """ 136 for key, value in data.items(): 137 if isinstance(value, datetime): 138 data[key] = value.isoformat() 139 elif isinstance(value, dict): 140 data[key] = convert_datetime_in_dict(value) 141 return data
Recursively converts all datetime objects in a dictionary to strings in ISO format.
Args:
data (dict): The dictionary containing the data.
Returns:
dict: The dictionary with datetime objects converted to strings.
145def handshake(base_url): 146 """ 147 Perform a handshake with the server to exchange encryption keys for the current session. 148 149 This function performs the following steps: 150 0. Performs login to the HABS server. 151 1. Sends a GET request to the server to initiate an RSA handshake. 152 2. Retrieves the server's public RSA key from the response. 153 3. Generates a local AES key and stores it in the environment. 154 4. Encrypts the AES key with the server's RSA key. 155 5. Sends the encrypted AES key to the server to complete the AES handshake. 156 157 Args: 158 **base_url** (*str*): The base URL of the server's API. 159 160 Returns: 161 *bool*: True if the handshake is successful, None otherwise. 162 163 Raises: 164 **requests.RequestException**: If a request to the server fails. 165 166 Example: 167 ``` 168 success = handshake("https://example.com") 169 if success: 170 print("Handshake completed successfully.") 171 else: 172 print("Handshake failed.") 173 ``` 174 """ 175 global BASE_URL 176 BASE_URL = base_url 177 url = f"{BASE_URL}/api/{VERSION}/handshake_rsa" 178 response = requests.get(url) 179 180 if response.status_code == 200: 181 print("Handshake (RSA) successful.") 182 api_public_key_pem = response.json().get('api_public_key') 183 api_public_key = serialization.load_pem_public_key( 184 api_public_key_pem.encode(), 185 backend=default_backend() 186 ) 187 os.environ['API_PUBLIC_KEY'] = api_public_key_pem 188 189 # Then we generate and store the AES key 190 aes_key = generate_aes_key() 191 # print("aes_key", aes_key) 192 os.environ['AES_KEY'] = base64.b64encode( aes_key ).decode('utf-8') 193 194 encrypted_aes_key = encrypt_aes_key_with_rsa(aes_key, api_public_key) 195 encrypted_aes_key_b64 = base64.b64encode(encrypted_aes_key).decode('utf-8') 196 # print("encrypted_aes_key_b64",encrypted_aes_key_b64) 197 aes_key_payload = { 198 "encrypted_aes_key": encrypted_aes_key_b64 199 } 200 response = requests.post(f"{BASE_URL}/api/{VERSION}/handshake_aes", json=aes_key_payload) 201 202 if response.status_code == 200: 203 print("Handshake (AES) successful.") 204 return True 205 else: 206 print("Handshake (AES) failed:", response.text) 207 return None 208 else: 209 print("Handshake (RSA) failed:", response.text) 210 return None
Perform a handshake with the server to exchange encryption keys for the current session.
This function performs the following steps:
- Performs login to the HABS server.
- Sends a GET request to the server to initiate an RSA handshake.
- Retrieves the server's public RSA key from the response.
- Generates a local AES key and stores it in the environment.
- Encrypts the AES key with the server's RSA key.
- Sends the encrypted AES key to the server to complete the AES handshake.
Args:
base_url (str): The base URL of the server's API.
Returns:
bool: True if the handshake is successful, None otherwise.
Raises:
requests.RequestException: If a request to the server fails.
Example:
success = handshake("https://example.com")
if success:
print("Handshake completed successfully.")
else:
print("Handshake failed.")
215def set_user(first_name=None, last_name=None, email=None, age=None, weight=None, gender=None): 216 """ 217 Creates a user by sending user data to the server. 218 219 This function performs the following steps: 220 1. Constructs the user data dictionary. 221 2. Validates the user data against the "userSchema". 222 3. Encrypts the user data using the stored AES key. 223 4. Sends the encrypted user data to the server. 224 5. Handles the server's response. 225 226 Args: 227 **first_name** (*str*, optional): The user's first name. 228 **last_name** (*str*, optional): The user's last name. 229 **email** (*str*, optional): The user's email address. 230 **age** (*int*, optional): The user's age. 231 **weight** (*float*, optional): The user's weight. 232 **gender** (*str*, optional): The user's gender. 233 234 Returns: 235 *str*: The user ID if the user is successfully created/retrieved, None otherwise. 236 237 Example: 238 ``` 239 user_id = set_user(first_name="John", last_name="Doe", email="john.doe@example.com", age=30, weight=70.5, gender="X") 240 if user_id: 241 print(f"User created/retrieved with ID: {user_id}") 242 else: 243 print("User creation failed.") 244 ``` 245 """ 246 url = f"{BASE_URL}/api/{VERSION}/users" 247 user_data = { 248 "first_name": first_name, 249 "last_name": last_name, 250 "email": email, 251 "age": age, 252 "weight": weight, 253 "gender": gender 254 } 255 if validate_metadata(user_data, "userSchema", ): 256 _user = { 257 "user_data": user_data 258 } 259 _user = json.dumps(_user).encode('utf-8') 260 aes_key_b64 = os.environ.get('AES_KEY') 261 aes_key_bytes = base64.b64decode(aes_key_b64) 262 response = requests.post( 263 url, 264 data=encrypt_message(_user, aes_key_bytes), 265 headers={'Content-Type': 'application/octet-stream'} 266 ) 267 268 if response.status_code == 200: 269 print("User successfully created/retrieved.") 270 user_id = response.json().get('user_id') 271 return user_id 272 else: 273 print("User creation failed:", response.text) 274 return None 275 else: 276 print("User creation failed.")
Creates a user by sending user data to the server.
This function performs the following steps:
- Constructs the user data dictionary.
- Validates the user data against the "userSchema".
- Encrypts the user data using the stored AES key.
- Sends the encrypted user data to the server.
- Handles the server's response.
Args:
first_name (str, optional): The user's first name.
last_name (str, optional): The user's last name.
email (str, optional): The user's email address.
age (int, optional): The user's age.
weight (float, optional): The user's weight.
gender (str, optional): The user's gender.
Returns:
str: The user ID if the user is successfully created/retrieved, None otherwise.
Example:
user_id = set_user(first_name="John", last_name="Doe", email="john.doe@example.com", age=30, weight=70.5, gender="X")
if user_id:
print(f"User created/retrieved with ID: {user_id}")
else:
print("User creation failed.")
280def search_user_by_mail(email): 281 """ 282 Search for a user by email. 283 284 This function sends a GET request to the server to search for a user by the provided email address. 285 286 Args: 287 **email** (*str*): The email address of the user to search for. 288 289 Returns: 290 *str*: The user ID if the user is found, None otherwise. 291 292 Example: 293 ``` 294 user_id = search_user_by_mail("john.doe@example.com") 295 if user_id: 296 print(f"User found with ID: {user_id}") 297 else: 298 print("User not found.") 299 ``` 300 """ 301 url = f"{BASE_URL}/api/{VERSION}/users?email={email}" 302 303 response = requests.get(url) 304 305 if response.status_code == 200: 306 user_id = response.json().get('user_id') 307 print("User found:", user_id) 308 return user_id 309 else: 310 print("User not found.", response.text) 311 return None
Search for a user by email.
This function sends a GET request to the server to search for a user by the provided email address.
Args:
email (str): The email address of the user to search for.
Returns:
str: The user ID if the user is found, None otherwise.
Example:
user_id = search_user_by_mail("john.doe@example.com")
if user_id:
print(f"User found with ID: {user_id}")
else:
print("User not found.")
315def get_user_by_id(user_id): 316 """ 317 Retrieve user data by user ID. 318 319 This function sends a GET request to the server to retrieve user data for the specified user ID. 320 The response data is decrypted using AES before returning the user data. 321 322 Args: 323 **user_id** (*str*): The unique identifier of the user to retrieve. 324 325 Returns: 326 *dict*: The user data if the user is found, None otherwise. 327 328 Example: 329 ``` 330 user_data = get_user_by_id("1234567890") 331 if user_data: 332 print(f"User data: {user_data}") 333 else: 334 print("User not found.") 335 ``` 336 """ 337 url = f"{BASE_URL}/api/{VERSION}/users/{user_id}" 338 339 response = requests.get(url) 340 341 if response.status_code == 200: 342 print("User found.") 343 encrypted_data = response.content 344 aes_key_b64 = os.environ.get('AES_KEY') 345 aes_key_bytes = base64.b64decode(aes_key_b64) 346 decrypted_json_string = decrypt_message(encrypted_data, aes_key_bytes) 347 user_data = json.loads(decrypted_json_string)['user_data'] 348 return user_data 349 else: 350 print("User not found:", response.text) 351 return None
Retrieve user data by user ID.
This function sends a GET request to the server to retrieve user data for the specified user ID.
The response data is decrypted using AES before returning the user data.
Args:
user_id (str): The unique identifier of the user to retrieve.
Returns:
dict: The user data if the user is found, None otherwise.
Example:
user_data = get_user_by_id("1234567890")
if user_data:
print(f"User data: {user_data}")
else:
print("User not found.")
355def set_session(metadata): 356 """ 357 Create a new simple session. 358 359 This function sends a POST request to the server to create a new simple session using the provided metadata. 360 The metadata is encrypted using AES before being sent to the server. 361 362 Args: 363 **metadata** (*dict*): A dictionary containing the session metadata. The only required metadata for the simple session are the user_id and a date. 364 365 Returns: 366 *str*: The unique identifier of the created session if successful, None otherwise. 367 368 Example: 369 ``` 370 session_metadata = { 371 "user_id": "1076203852085", 372 "session_date": "2024-05-30T12:00:00Z" 373 } 374 session_id = set_session(session_metadata) 375 if session_id: 376 print(f"Session created with ID: {session_id}") 377 else: 378 print("Failed to create session.") 379 ``` 380 """ 381 url = f"{BASE_URL}/api/{VERSION}/sessions" 382 _session = metadata 383 _session = json.dumps(_session).encode('utf-8') 384 aes_key_b64 = os.environ.get('AES_KEY') 385 aes_key_bytes = base64.b64decode(aes_key_b64) 386 response = requests.post( 387 url, 388 data=encrypt_message(_session, aes_key_bytes), 389 headers={'Content-Type': 'application/octet-stream'} 390 ) 391 392 if response.status_code == 200: 393 print("Session successfully created.") 394 # Extract the unique identifier for the uploaded data 395 session_id = response.json().get('session_id') 396 397 # print(session_id) 398 return session_id 399 else: 400 print("Session failed:", response.text) 401 return None
Create a new simple session.
This function sends a POST request to the server to create a new simple session using the provided metadata. The metadata is encrypted using AES before being sent to the server.
Args:
metadata (dict): A dictionary containing the session metadata. The only required metadata for the simple session are the user_id and a date.
Returns:
str: The unique identifier of the created session if successful, None otherwise.
Example:
session_metadata = {
"user_id": "1076203852085",
"session_date": "2024-05-30T12:00:00Z"
}
session_id = set_session(session_metadata)
if session_id:
print(f"Session created with ID: {session_id}")
else:
print("Failed to create session.")
405def get_data_by_id(data_id): 406 """ 407 Retrieve raw data by its unique identifier from the server. 408 409 This function sends a GET request to fetch raw data associated with a specific identifier. It 410 assumes that the data, if retrieved successfully, does not require decryption and is directly accessible. 411 412 Args: 413 **data_id** (*str*): The unique identifier for the data to be retrieved. 414 415 Returns: 416 **dict**: The raw data if retrieval is successful, None otherwise. 417 418 Example: 419 ``` 420 data_id = "1234" 421 raw_data = get_data_by_id(data_id) 422 ... use the data 423 ``` 424 """ 425 get_url = f"{BASE_URL}/api/{VERSION}/rawdata/{data_id}" 426 427 response = requests.get(get_url) 428 429 if response.status_code == 200: 430 print("Retrieved data successfully.") 431 # decrypt 432 return response.json().get('rawData') 433 else: 434 print("Failed to retrieve data:", response.text)
Retrieve raw data by its unique identifier from the server.
This function sends a GET request to fetch raw data associated with a specific identifier. It assumes that the data, if retrieved successfully, does not require decryption and is directly accessible.
Args:
data_id (str): The unique identifier for the data to be retrieved.
Returns:
dict: The raw data if retrieval is successful, None otherwise.
Example:
data_id = "1234"
raw_data = get_data_by_id(data_id)
... use the data
438def get_data_by_session(session_id): 439 """ 440 Retrieve raw data associated with a specific session identifier from the server. 441 442 This function sends a GET request to fetch all raw data linked to the given session ID. The data 443 is returned in its raw form assuming it does not require decryption for usage. 444 445 Args: 446 **session_id** (*str*): The session identifier whose associated data is to be retrieved. 447 448 Returns: 449 *dict*: The raw data linked to the session if retrieval is successful, None otherwise. 450 451 Example: 452 ``` 453 session_id = "abcd1234" 454 session_data = get_data_by_session(session_id) 455 if session_data: 456 print("Data retrieved:", session_data) 457 else: 458 print("Failed to retrieve data.") 459 ``` 460 """ 461 get_url = f"{BASE_URL}/api/{VERSION}/sessions/{session_id}/rawdata" 462 463 response = requests.get(get_url) 464 465 if response.status_code == 200: 466 print("Retrieved data successfully.") 467 # decrypt 468 return response.json().get('data') 469 else: 470 print("Failed to retrieve data:", response.text)
Retrieve raw data associated with a specific session identifier from the server.
This function sends a GET request to fetch all raw data linked to the given session ID. The data is returned in its raw form assuming it does not require decryption for usage.
Args:
session_id (str): The session identifier whose associated data is to be retrieved.
Returns:
dict: The raw data linked to the session if retrieval is successful, None otherwise.
Example:
session_id = "abcd1234"
session_data = get_data_by_session(session_id)
if session_data:
print("Data retrieved:", session_data)
else:
print("Failed to retrieve data.")
475def get_data_ids_by_session(session_id): 476 """ 477 Retrieve a list of data IDs associated with a specific session from the server. 478 479 This function sends a GET request to fetch the IDs of all data entries linked to a specified session ID. 480 The IDs are returned as a list. The function assumes the data does not require decryption for usage. 481 482 Args: 483 **session_id** (*str*): The session identifier for which data IDs are to be retrieved. 484 485 Returns: 486 *list*: A list of data IDs if retrieval is successful, None otherwise. 487 488 Example: 489 ``` 490 session_id = "abcd1234" 491 data_ids = get_data_ids_by_session(session_id) 492 if data_ids: 493 print("Data IDs retrieved:", data_ids) 494 else: 495 print("Failed to retrieve data IDs.") 496 ``` 497 """ 498 get_url = f"{BASE_URL}/api/{VERSION}/sessions/{session_id}/ids" 499 500 response = requests.get(get_url) 501 502 if response.status_code == 200: 503 print("Retrieved ids successfully.") 504 # decrypt 505 return response.json().get('ids') 506 else: 507 print("Failed to retrieve ids:", response.text)
Retrieve a list of data IDs associated with a specific session from the server.
This function sends a GET request to fetch the IDs of all data entries linked to a specified session ID. The IDs are returned as a list. The function assumes the data does not require decryption for usage.
Args:
session_id (str): The session identifier for which data IDs are to be retrieved.
Returns:
list: A list of data IDs if retrieval is successful, None otherwise.
Example:
session_id = "abcd1234"
data_ids = get_data_ids_by_session(session_id)
if data_ids:
print("Data IDs retrieved:", data_ids)
else:
print("Failed to retrieve data IDs.")
512def upload_data(metadata, timestamps, data, ppg_red, ppg_ir): 513 """ 514 Uploads EEG and PPG data to the server along with associated metadata. 515 516 This function compiles different types of physiological data along with metadata into a single dictionary, 517 encrypts the data, and then uploads it via a POST request. Upon successful upload, the server returns a 518 unique identifier for the data which can then be used for future queries or operations. 519 520 Args: 521 **metadata** (*dict*): Information about the data such as subject details and session parameters. 522 **timestamps** (*list*): List of timestamps correlating with each data point. 523 **data** (*list*): EEG data points. 524 **ppg_red** (*list*): Red photoplethysmogram data points. 525 **ppg_ir** (*list*): Infrared photoplethysmogram data points. 526 527 Returns: 528 *tuple*: A tuple containing the data ID of the uploaded data if successful, and None otherwise. 529 530 Example: 531 ``` 532 metadata = {"session_id": "1234", "subject_id": "001"} 533 timestamps = [1597709184, 1597709185] 534 data = [0.1, 0.2] 535 ppg_red = [123, 124] 536 ppg_ir = [125, 126] 537 data_id, error = upload_data(metadata, timestamps, data, ppg_red, ppg_ir) 538 if data_id: 539 print("Data uploaded successfully. Data ID:", data_id) 540 else: 541 print("Upload failed with error:", error) 542 ``` 543 """ 544 url = f"{BASE_URL}/api/{VERSION}/rawdata" 545 _data = { 546 "metadata": metadata, 547 "timestamps": timestamps, 548 "data": data, 549 "ppg_red": ppg_red, 550 "ppg_ir": ppg_ir 551 } 552 _data = json.dumps(_data).encode('utf-8') 553 554 # response = requests.post(url, json=_data) 555 aes_key_b64 = os.environ.get('AES_KEY') 556 aes_key_bytes = base64.b64decode(aes_key_b64) 557 response = requests.post( 558 url, 559 data=encrypt_message(_data, aes_key_bytes), 560 headers={'Content-Type': 'application/octet-stream'} 561 ) 562 563 if response.status_code == 200: 564 print('.', end='', flush=True) 565 # Extract the unique identifier for the uploaded data 566 data_id = response.json().get('data_id') 567 return data_id, None 568 else: 569 print("Upload failed:", response.text) 570 return None
Uploads EEG and PPG data to the server along with associated metadata.
This function compiles different types of physiological data along with metadata into a single dictionary, encrypts the data, and then uploads it via a POST request. Upon successful upload, the server returns a unique identifier for the data which can then be used for future queries or operations.
Args:
metadata (dict): Information about the data such as subject details and session parameters.
timestamps (list): List of timestamps correlating with each data point.
data (list): EEG data points.
ppg_red (list): Red photoplethysmogram data points.
ppg_ir (list): Infrared photoplethysmogram data points.
Returns:
tuple: A tuple containing the data ID of the uploaded data if successful, and None otherwise.
Example:
metadata = {"session_id": "1234", "subject_id": "001"}
timestamps = [1597709184, 1597709185]
data = [0.1, 0.2]
ppg_red = [123, 124]
ppg_ir = [125, 126]
data_id, error = upload_data(metadata, timestamps, data, ppg_red, ppg_ir)
if data_id:
print("Data uploaded successfully. Data ID:", data_id)
else:
print("Upload failed with error:", error)
575def acquire_send_raw(user_id, date, board, stream_duration, buffer_duration): 576 """ 577 Asynchronously acquires raw data from a specific EEG board and sends it to the server. 578 579 This function connects to an EEG board, initiates a data acquisition session, and sends the collected data 580 to the server in real-time or near real-time. It ensures that all the data handled during the session 581 is associated with a unique session ID and metadata that includes user and session details. The function 582 will validate the session metadata before proceeding with data acquisition and sending. 583 584 Args: 585 **user_id** (*str*): The unique identifier of the user for whom the data is being collected. 586 **date** (*str*): The date of the session, used for metadata purposes. 587 **board** (*int*): Identifier for the EEG board from which data will be acquired. 588 **stream_duration** (*int*): Duration in seconds for which data will be streamed from the board. 589 **buffer_duration** (*int*): Time in seconds for how often the data is buffered and sent. 590 591 Returns: 592 *str* or *bool*: The session ID if the operation is successful; False otherwise. 593 594 Raises: 595 **ConnectionError**: If the board connection fails. 596 **ValidationError**: If the metadata does not comply with the required schema. 597 598 Example: 599 ``` 600 session = acquire_send_raw('user123', '2021-06-01', 'MUSE_S', 300, 10) 601 if session: 602 print(f"Session successfully started with ID: {session}") 603 else: 604 print("Failed to start session") 605 ``` 606 """ 607 session_id = asyncio.run( _acquire_send_raw(user_id, date, board, stream_duration, buffer_duration) ) 608 return session_id
Asynchronously acquires raw data from a specific EEG board and sends it to the server.
This function connects to an EEG board, initiates a data acquisition session, and sends the collected data to the server in real-time or near real-time. It ensures that all the data handled during the session is associated with a unique session ID and metadata that includes user and session details. The function will validate the session metadata before proceeding with data acquisition and sending.
Args:
user_id (str): The unique identifier of the user for whom the data is being collected.
date (str): The date of the session, used for metadata purposes.
board (int): Identifier for the EEG board from which data will be acquired.
stream_duration (int): Duration in seconds for which data will be streamed from the board.
buffer_duration (int): Time in seconds for how often the data is buffered and sent.
Returns:
str or bool: The session ID if the operation is successful; False otherwise.
Raises:
ConnectionError: If the board connection fails.
ValidationError: If the metadata does not comply with the required schema.
Example:
session = acquire_send_raw('user123', '2021-06-01', 'MUSE_S', 300, 10)
if session:
print(f"Session successfully started with ID: {session}")
else:
print("Failed to start session")
638def send_file(user_id, date, edf_file, ch_nrs=None, ch_names=None): 639 """ 640 Uploads EEG data from a file to the server along with associated metadata. 641 642 This function compiles EEG data from an [EDF file](https://www.edfplus.info/downloads/index.html) along with metadata into a single dictionary, 643 encrypts the data, and then uploads it via a POST request. Upon successful upload, the server returns a 644 unique identifier for the session which can then be used for future queries or operations. 645 646 Args: 647 **user_id** (*str*): The unique identifier of the user for whom the data is being collected. 648 **metadata** (*dict*): Information about the data such as subject details and session parameters. 649 **date** (*str*): The date of the session, used for metadata purposes. 650 **edf_file** (*str*): name of an EDF file. 651 **ch_nrs** (*list of int*, optional): The indices of the channels to read. The default is None. 652 **ch_names** (*list of str*, optional): The names of channels to read. The default is None. 653 654 Returns: 655 *tuple*: A tuple containing the session ID of the uploaded data if successful, and None otherwise. 656 657 Example: 658 ``` 659 session = send_file('user123', '2021-06-01', 'nameoffile.edf') 660 if session: 661 print(f"Session successfully started with ID: {session}") 662 else: 663 print("Failed to start session") 664 ``` 665 """ 666 667 try: 668 signals, signal_headers, header = highlevel.read_edf(edf_file, ch_nrs, ch_names) 669 670 max_time = signals.shape[1] / signal_headers[0]['sample_frequency'] 671 timestamps = np.linspace(header['startdate'].timestamp(), max_time, signals.shape[1]) 672 673 session_metadata = { 674 "user_id": user_id, # add user to the session for reference 675 "session_date": header['startdate'].strftime("%m/%d/%Y, %H:%M:%S") 676 } 677 if validate_metadata(session_metadata, "sessionSchema"): 678 session_id = set_session(metadata={**session_metadata}) 679 metadata = {'session_id':session_id, **session_metadata, **convert_datetime_in_dict(header), **convert_datetime_in_dict(signal_headers[0])} 680 681 chunks = ((signals.size * signals.itemsize)//300000)+1 682 timestamps_chunks = np.array_split(timestamps, chunks) 683 signals_chunks = np.array_split(signals, chunks, axis=1) 684 json_data = json.dumps(signals_chunks[0].tolist()) 685 size_in_bytes = sys.getsizeof(json_data) 686 print("%d total bytes will be sent into %d chunks of %d bytes" % (signals.size * signals.itemsize, chunks, size_in_bytes)) 687 688 for timestamps_chunk,signals_chunk in zip(timestamps_chunks, signals_chunks): 689 upload_data(metadata, timestamps_chunk.tolist(), signals_chunk.tolist(), [], []) 690 691 return session_id 692 else: 693 return False 694 except Exception as e: 695 print("A general error occurred:", e) 696 return False
Uploads EEG data from a file to the server along with associated metadata.
This function compiles EEG data from an EDF file along with metadata into a single dictionary, encrypts the data, and then uploads it via a POST request. Upon successful upload, the server returns a unique identifier for the session which can then be used for future queries or operations.
Args:
user_id (str): The unique identifier of the user for whom the data is being collected.
metadata (dict): Information about the data such as subject details and session parameters.
date (str): The date of the session, used for metadata purposes.
edf_file (str): name of an EDF file.
ch_nrs (list of int, optional): The indices of the channels to read. The default is None.
ch_names (list of str, optional): The names of channels to read. The default is None.
Returns:
tuple: A tuple containing the session ID of the uploaded data if successful, and None otherwise.
Example:
session = send_file('user123', '2021-06-01', 'nameoffile.edf')
if session:
print(f"Session successfully started with ID: {session}")
else:
print("Failed to start session")
702def set_pipe(metadata, pipeline, params): 703 """ 704 Configures and initiates a data processing pipeline for a session on the server. 705 706 This function sends metadata and processing parameters to a specified pipeline endpoint 707 to create a data processing session. It encrypts the session data before sending to ensure 708 security. The function checks the server response to confirm the session creation. 709 710 Args: 711 **metadata** (*dict*): A dictionary containing metadata about the session, typically including 712 details such as user ID and session date. 713 **pipeline** (*str*): The identifier for the processing pipeline to be used. 714 **params** (*dict*): Parameters specific to the processing pipeline, detailing how data should 715 be processed. 716 717 Returns: 718 *str* or *None*: The session ID if the session is successfully created, or None if the operation fails. 719 720 Raises: 721 **requests.exceptions.RequestException**: An error from the Requests library when an HTTP request fails. 722 **KeyError**: If necessary keys are missing in the environment variables. 723 724 Example: 725 ``` 726 session_metadata = {"user_id": "123", "session_date": "2024-06-03"} 727 processing_params = {"filter_type": "lowpass", "cutoff_freq": 30} 728 session_id = set_pipe(session_metadata, 'eeg_smoothing', processing_params) 729 if session_id: 730 print(f"Pipeline session created with ID: {session_id}") 731 else: 732 print("Failed to create pipeline session") 733 ``` 734 """ 735 url = f"{BASE_URL}/api/{VERSION}/sessions/pipe/{pipeline}" 736 _session = { 737 "metadata": metadata, 738 "processing_params": params, 739 } 740 _session = json.dumps(_session).encode('utf-8') 741 aes_key_b64 = os.environ.get('AES_KEY') 742 aes_key_bytes = base64.b64decode(aes_key_b64) 743 response = requests.post( 744 url, 745 data=encrypt_message(_session, aes_key_bytes), 746 headers={'Content-Type': 'application/octet-stream'} 747 ) 748 749 if response.status_code == 200: 750 print("Session successfully created.") 751 # Extract the unique identifier for the uploaded data 752 session_id = response.json().get('session_id') 753 # print(session_id) 754 return session_id 755 else: 756 print("Session failed:", response.text) 757 return None
Configures and initiates a data processing pipeline for a session on the server.
This function sends metadata and processing parameters to a specified pipeline endpoint to create a data processing session. It encrypts the session data before sending to ensure security. The function checks the server response to confirm the session creation.
Args:
metadata (dict): A dictionary containing metadata about the session, typically including
details such as user ID and session date.
pipeline (str): The identifier for the processing pipeline to be used.
params (dict): Parameters specific to the processing pipeline, detailing how data should
be processed.
Returns:
str or None: The session ID if the session is successfully created, or None if the operation fails.
Raises:
requests.exceptions.RequestException: An error from the Requests library when an HTTP request fails.
KeyError: If necessary keys are missing in the environment variables.
Example:
session_metadata = {"user_id": "123", "session_date": "2024-06-03"}
processing_params = {"filter_type": "lowpass", "cutoff_freq": 30}
session_id = set_pipe(session_metadata, 'eeg_smoothing', processing_params)
if session_id:
print(f"Pipeline session created with ID: {session_id}")
else:
print("Failed to create pipeline session")
761def upload_pipedata(metadata, timestamps, data, ppg_red, ppg_ir): 762 """ 763 Uploads processed data to a specific session on the server. 764 765 This function is responsible for uploading various data streams associated with a session, including 766 timestamps and physiological measurements such as PPG (Photoplethysmogram). The data is encrypted before 767 sending to ensure confidentiality and integrity. 768 769 Args: 770 **metadata** (*dict*): Contains session-related metadata including the session ID. 771 **timestamps** (*list*): A list of timestamps corresponding to each data point. 772 **data** (*list*): The main data collected, e.g., EEG readings. 773 **ppg_red** (*list*): Red channel data from a PPG sensor. 774 **ppg_ir** (*list*): Infrared channel data from a PPG sensor. 775 776 Returns: 777 *tuple*: A tuple containing the data ID if the upload is successful and the processed data, or None if the upload fails. 778 779 Raises: 780 **requests.exceptions.RequestException: An error from the Requests library when an HTTP request fails. 781 **KeyError**: If necessary keys are missing in the environment variables. 782 783 Example: 784 ``` 785 session_metadata = {"session_id": "12345"} 786 timestamps = [1597709165, 1597709166, ...] 787 data = [0.1, 0.2, ...] 788 ppg_red = [12, 15, ...] 789 ppg_ir = [20, 22, ...] 790 data_id, processed_data = upload_pipedata(session_metadata, timestamps, data, ppg_red, ppg_ir) 791 if data_id: 792 print(f"Data uploaded successfully with ID: {data_id}") 793 else: 794 print("Failed to upload data") 795 ``` 796 """ 797 url = f"{BASE_URL}/api/{VERSION}/pipedata/{metadata['session_id']}" # the metadata contain session_id to consistently pass it with each upload 798 799 _data = { 800 "metadata": metadata, 801 "timestamps": timestamps, 802 "data": data, 803 "ppg_red": ppg_red, 804 "ppg_ir": ppg_ir 805 } 806 _data = json.dumps(_data).encode('utf-8') 807 aes_key_b64 = os.environ.get('AES_KEY') 808 aes_key_bytes = base64.b64decode(aes_key_b64) 809 response = requests.post( 810 url, 811 data=encrypt_message(_data, aes_key_bytes), 812 headers={'Content-Type': 'application/octet-stream'} 813 ) 814 815 if response.status_code == 200: 816 print('.', end='', flush=True) 817 # Extract the unique identifier for the uploaded data 818 data_id = response.json().get('data_id') 819 # Retrieve the processed data 820 data = response.json().get('pipeData') 821 return data_id, data 822 else: 823 print("Upload failed:", response.text) 824 return None
Uploads processed data to a specific session on the server.
This function is responsible for uploading various data streams associated with a session, including timestamps and physiological measurements such as PPG (Photoplethysmogram). The data is encrypted before sending to ensure confidentiality and integrity.
Args:
metadata (dict): Contains session-related metadata including the session ID.
timestamps (list): A list of timestamps corresponding to each data point.
data (list): The main data collected, e.g., EEG readings.
ppg_red (list): Red channel data from a PPG sensor.
ppg_ir (list): Infrared channel data from a PPG sensor.
Returns:
tuple: A tuple containing the data ID if the upload is successful and the processed data, or None if the upload fails.
Raises:
requests.exceptions.RequestException: An error from the Requests library when an HTTP request fails.
**KeyError: If necessary keys are missing in the environment variables.
Example:
session_metadata = {"session_id": "12345"}
timestamps = [1597709165, 1597709166, ...]
data = [0.1, 0.2, ...]
ppg_red = [12, 15, ...]
ppg_ir = [20, 22, ...]
data_id, processed_data = upload_pipedata(session_metadata, timestamps, data, ppg_red, ppg_ir)
if data_id:
print(f"Data uploaded successfully with ID: {data_id}")
else:
print("Failed to upload data")
828def acquire_send_pipe(pipeline, params, user_id, date, board, stream_duration, buffer_duration, callback=None): 829 """ 830 Acquires data from a board, processes it according to a specified pipeline, and sends it to a server. 831 This function handles setting up a session for data acquisition and processing, connects to a board, 832 and manages the data flow from acquisition through processing to uploading. It uses an asynchronous loop 833 to handle the operations efficiently, suitable for real-time data processing scenarios. 834 835 Args: 836 **pipeline** (*str*): Name of the processing pipeline to use. 837 **params** (*dict*): Parameters for the pipeline processing. 838 **user_id** (*str*): The user ID to which the session will be associated. 839 **date** (*str*): Date of the session for tracking purposes. 840 **board** (*int*): Identifier for the hardware board to use for data acquisition. 841 **stream_duration** (*int*): Duration in seconds to stream data from the board. 842 **buffer_duration** (*int*): Duration in seconds to buffer data before processing. 843 **callback** (*function*): Optional callback function to execute after data is sent. 844 845 Returns: 846 *str* or *bool*: The session ID if successful, False otherwise. 847 848 """ 849 session_id = asyncio.run( 850 _acquire_send_pipe(pipeline, params, user_id, date, board, stream_duration, buffer_duration, callback) 851 ) 852 return session_id
Acquires data from a board, processes it according to a specified pipeline, and sends it to a server. This function handles setting up a session for data acquisition and processing, connects to a board, and manages the data flow from acquisition through processing to uploading. It uses an asynchronous loop to handle the operations efficiently, suitable for real-time data processing scenarios.
Args:
pipeline (str): Name of the processing pipeline to use.
params (dict): Parameters for the pipeline processing.
user_id (str): The user ID to which the session will be associated.
date (str): Date of the session for tracking purposes.
board (int): Identifier for the hardware board to use for data acquisition.
stream_duration (int): Duration in seconds to stream data from the board.
buffer_duration (int): Duration in seconds to buffer data before processing.
callback (function): Optional callback function to execute after data is sent.
Returns:
str or bool: The session ID if successful, False otherwise.
883def train(session_id, params): 884 """ 885 Sends a request to the server to train a machine learning algorithm on the data from a specified session. 886 887 Args: 888 **session_id** (*str*): The unique identifier of the session containing the data to be used for training. 889 **params** (*dict*): The parameters for the training process. 890 891 Returns: 892 *str* or *None*: The task ID if the request is successful, None otherwise. 893 894 This function sends the training parameters and session ID to the server, which initiates the training process. 895 The response includes a task ID that can be used for future interactions related to the training task. 896 897 Example: 898 ``` 899 train("session_12345", {"param1": "value1", "param2": "value2"}) 900 ``` 901 """ 902 url = f"{BASE_URL}/api/{VERSION}/train/{session_id}" 903 _params = { 904 "params": params, 905 } 906 _params = json.dumps(_params).encode('utf-8') 907 aes_key_b64 = os.environ.get('AES_KEY') 908 aes_key_bytes = base64.b64decode(aes_key_b64) 909 response = requests.post( 910 url, 911 data=encrypt_message(_params, aes_key_bytes), 912 headers={'Content-Type': 'application/octet-stream'} 913 ) 914 915 if response.status_code == 200: 916 task_id = response.json().get('task_id') 917 print("Published. For future interactions, use task_id:",task_id) 918 return task_id 919 else: 920 print("Publish failed:", response.text) 921 return None
Sends a request to the server to train a machine learning algorithm on the data from a specified session.
Args:
session_id (str): The unique identifier of the session containing the data to be used for training.
params (dict): The parameters for the training process.
Returns:
str or None: The task ID if the request is successful, None otherwise.
This function sends the training parameters and session ID to the server, which initiates the training process. The response includes a task ID that can be used for future interactions related to the training task.
Example:
train("session_12345", {"param1": "value1", "param2": "value2"})
925def infer(data_id, params): 926 """ 927 Sends a request to the server to perform machine learning inference based on a previously trained model, given the data ID. 928 929 Args: 930 **data_id** (*str*): The unique identifier of the data to be used for inference. 931 **params** (*dict*): The parameters for the inference process. 932 933 Returns: 934 *str* or *None*: The task ID if the request is successful, None otherwise. 935 936 This function sends the inference parameters and data ID to the server, which initiates the inference process. 937 The response includes a task ID that can be used for future interactions related to the inference task. 938 939 Example: 940 ``` 941 infer("data_12345", {"param1": "value1", "param2": "value2"}) 942 ``` 943 """ 944 url = f"{BASE_URL}/api/{VERSION}/infer/{data_id}" 945 _params = { 946 "params": params, 947 } 948 _params = json.dumps(_params).encode('utf-8') 949 # response = requests.post(url, json=_params) 950 aes_key_b64 = os.environ.get('AES_KEY') 951 aes_key_bytes = base64.b64decode(aes_key_b64) 952 response = requests.post( 953 url, 954 data=encrypt_message(_params, aes_key_bytes), 955 headers={'Content-Type': 'application/octet-stream'} 956 ) 957 958 if response.status_code == 200: 959 task_id = response.json().get('task_id') 960 print("Published. For future interactions, use task_id:",task_id) 961 return task_id 962 else: 963 print("Publish failed:", response.text) 964 return None
Sends a request to the server to perform machine learning inference based on a previously trained model, given the data ID.
Args:
data_id (str): The unique identifier of the data to be used for inference.
params (dict): The parameters for the inference process.
Returns: str or None: The task ID if the request is successful, None otherwise.
This function sends the inference parameters and data ID to the server, which initiates the inference process. The response includes a task ID that can be used for future interactions related to the inference task.
Example:
infer("data_12345", {"param1": "value1", "param2": "value2"})