Skip to content

VNSconnect API

VNSconnect - Python library with different methods to call for communication with Skaaltec SmartVNS devices

VNSconnect

SDK wrapper providing convenient methods to connect to SmartVNS devices (Tracker / Stimulator) via Bleak and to read/write protobuf configurations as well as receive streamed notifications. Upper cased attributes are general constant variables.

Attributes:

Name Type Description
TRACKER_NAME str

Advertised BLE name of the motion tracker.

STIMULATOR_NAME str

Advertised BLE name of the stimulator.

TRACKER_ADDRESS str

Known BLE address/identifier of the motion tracker

STIMULATOR_ADDRESS str

Known BLE address/identifier of the stimulator (platform-specific).

SYS_CHAR str

Characteristic UUID of the system (stimulator and motion tracker consist of a system)

STIM_CHAR str

Characteristic UUID of the stimulator

DATA_UUID str

Characteristic UUID of the notification

BATTERY_UUID str

Characteristic UUID of the battery information

address str

Address of the desired device

connect_retries (int, optional)

Number of connection attempts before giving up (default: 3).

connect_retry_delay_s (float, optional)

Delay in seconds between connection attempts (default: 1.0).

timeout float

Time to scan for in seconds (default: 5.0).

uuid_key str

Logical key referring to the UUID (e.g., 'sys_config').

config_msg Union[SysConfig, StimConfig]

Configuration messages which should be send

sender Any

Argument for callback function with sender UUID

data bytearray

Raw received data from device

Methods:

Name Description
scanner

Scan for all nearby BLE devices for a given duration and list them.

find_device

Scan for a specific BLE device by address.

connect_to_device

Attempt to connect to the BLE device using its address.

_connect_to_device_by_name

Internal helper to scan for a BLE device by name and connect to it.

connect_to_tracker

Directly connect to the motion tracker with the known name

connect_to_stimulator

Directly connect to the stimulator directly with the known name

disconnect

Disconnection from the device

get_address

Return the address of the connected device.

get_mtu_size

Return the MTU size of the active connection.

write_config

Serialize a protobuf configuration and write it to a given characteristic.

read_config

Read configuration from the device and create a protobuf message.

create_stimconfig

Create a StimConfig protobuf message with user-defined parameters.

create_sysconfig

Create a SysConfig protobuf message with user-defined parameters.

_debug_list_notify_characteristics

Helper function to see all the channels (different UUIDS) from a device. Print all characteristics that support notifications.

notification_callback

Callback for the notification function which handles the received data arrays.

_get_bleak_GATT_Characteristic

Internal helper function to get the right target UUID (visible in _debug_list_notify_characteristics)

start_notification

Start the notification function

stop_notification

Stop the notification function

decode_data

Data decoder to get usable data.

_ensure_client

Ensure that a BLE client connection exists and is active.

Source code in sdk/vnsconnect.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
class VNSconnect:
    """
    SDK wrapper providing convenient methods to connect to SmartVNS
    devices (Tracker / Stimulator) via Bleak and to read/write protobuf
    configurations as well as receive streamed notifications. 
    Upper cased attributes are general constant variables.

    Attributes
    -------------------
    TRACKER_NAME: str
        Advertised BLE name of the motion tracker.
    STIMULATOR_NAME: str
        Advertised BLE name of the stimulator.
    TRACKER_ADDRESS: str
        Known BLE address/identifier of the motion tracker 
    STIMULATOR_ADDRESS: str
        Known BLE address/identifier of the stimulator (platform-specific).
    SYS_CHAR: str
        Characteristic UUID of the system (stimulator and motion tracker consist of a system)
    STIM_CHAR: str
        Characteristic UUID of the stimulator
    DATA_UUID: str 
        Characteristic UUID of the notification
    BATTERY_UUID: str
        Characteristic UUID of the battery information


    address: str
        Address of the desired device
    connect_retries: int, optional
        Number of connection attempts before giving up (default: 3).
    connect_retry_delay_s: float, optional
        Delay in seconds between connection attempts (default: 1.0).
    timeout: float
        Time to scan for in seconds (default: 5.0).
    uuid_key: str
        Logical key referring to the UUID (e.g., 'sys_config').
    config_msg: Union[SysConfig, StimConfig]
        Configuration messages which should be send
    sender: Any
        Argument for callback function with sender UUID
    data: bytearray
        Raw received data from device



    Methods
    -------
    scanner()
        Scan for all nearby BLE devices for a given duration and list them.
    find_device()
        Scan for a specific BLE device by address.
    connect_to_device()
        Attempt to connect to the BLE device using its address.
    _connect_to_device_by_name()
        Internal helper to scan for a BLE device by name and connect to it.
    connect_to_tracker()
        Directly connect to the motion tracker with the known name
    connect_to_stimulator()
        Directly connect to the stimulator directly with the known name
    disconnect()
        Disconnection from the device
    get_address(): 
        Return the address of the connected device.
    get_mtu_size():
        Return the MTU size of the active connection.
    write_config()
        Serialize a protobuf configuration and write it to a given characteristic.
    read_config()
        Read configuration from the device and create a protobuf message.
    create_stimconfig()
        Create a StimConfig protobuf message with user-defined parameters.
    create_sysconfig()
        Create a SysConfig protobuf message with user-defined parameters.
    _debug_list_notify_characteristics()
        Helper function to see all the channels (different UUIDS) from a device. Print all characteristics that support notifications.
    notification_callback()
        Callback for the notification function which handles the received data arrays.
    _get_bleak_GATT_Characteristic()
        Internal helper function to get the right target UUID (visible in _debug_list_notify_characteristics)
    start_notification()
        Start the notification function 
    stop_notification()
        Stop the notification function
    decode_data()
        Data decoder to get usable data.

    _ensure_client()
        Ensure that a BLE client connection exists and is active.



    """
    TRACKER_NAME = "SmartVNS Tracker"
    STIMULATOR_NAME = "SmartVNS Stimulator"

    # TRACKER_ADDRESS = "865325AD-2821-7483-BF53-AAF04BD741D2"
    # STIMULATOR_ADDRESS = ""

    SYS_CHAR = "CE60014D-AE91-11e1-4496-9FC5DD4AFF01"  # UUID aus configure.py
    STIM_CHAR = "CE60014E-AE91-11e1-4496-9FC5DD4AFF01"

    DATA_UUID = "ce60014d-ae91-11e1-4495-9fc5dd4aff08"
    BATTERY_UUID = "2a19"

    def __init__(
            self,
            address: str = "",
            # target: Literal["tracker", "stim"],
            connect_retries: int = 3,
            connect_retry_delay_s: float = 1.0,
    ):
        """
        Parameters
        ----------
        target: Literal[str, str]
            General target if connection applies to motion tracker or stimulator
        connect_retries: int
            Number of connection attempts before timeout (default: 3).
        connect_retry_delay_s: float
            Delay in seconds between connection attempts (default: 1.0).

        """

        self.address = address
        self._client: Optional[BleakClient] = None
        self._connect_retries = connect_retries
        self._connect_retry_delay_s = connect_retry_delay_s

    """Scan and list devices"""

    async def scanner(self, timeout: float = 5.0) -> dict[str, tuple[BLEDevice, Any]]:
        """
        Scan for all nearby BLE devices for a given duration and include RSSI data.

        Returns
        -------
        dict[str, tuple[BLEDevice, AdvertisementData]]
            Dictionary of discovered devices and their advertisement data.
        """
        devices_dict = await BleakScanner.discover(timeout=timeout, return_adv=True)

        wanted = {"SmartVNS Tracker", "SmartVNS Stimulator", "Zephyr"}
        result: dict[str, tuple[BLEDevice, Any]] = {}

        for addr, (device, adv) in devices_dict.items():
            name = device.name  # or getattr(adv, "local_name", "Unknown")
            if name in wanted:
                rssi = adv.rssi
                print(f"- {name} ({device.address}) RSSI: {rssi} dBm")
                result[addr] = (device, adv)

        return result

    """Discover"""

    async def find_device(self, timeout: float = 5.0) -> Optional[BLEDevice]:
        """
        Scan for a specific BLE device for a given duration by address.

        Parameters
        ----------
        timeout: flaot
            Time to scan for in seconds (default: 5.0).

        Returns
        -------
        Optional[BLEDevice]
            BLEDevice object if found, None otherwise.
        """
        print(f"Scanning for device with address {self.address}...")
        device = await BleakScanner.find_device_by_address(self.address, timeout=timeout)

        if device is None:
            print(f"Device with address {self.address} not found")
            return None

        print(f"Found device: {device.name or 'Unknown'} ({device.address})")
        return device

    """Connection"""

    async def connect_to_device(self) -> None:
        """
        Attempt to connect to the BLE device using its address.

        Raises
        ------
        RuntimeError
            If the client is not connected.


        """
        last_err = None

        for attempt in range(self._connect_retries):
            try:
                self._client = BleakClient(self.address)
                await self._client.connect()
                print(f"Connected to {self.address}")
                return

            except Exception as e:
                print(
                    f"Connection attempt {attempt}/{self._connect_retries} failed: {e}")
                last_err = e
                await asyncio.sleep(self._connect_retry_delay_s)

        raise RuntimeError(
            f"Failed to connect to {self.address}") from last_err

    async def _connect_to_device_by_name(self, target_name: str, timeout: float = 5.0) -> None:
        """
        Internal helper to scan for a BLE device by name and connect to it.

        Parameters
        ----------
        target_name : str
            The advertised name of the BLE device (e.g., "SmartVNS Tracker").
        timeout : float
            How long to scan for the device, in seconds.

        Raises
        ------
        RuntimeError
            If the device cannot be found or connection fails.
        """
        print(f"Scanning for device with name '{target_name}'...")
        device = await BleakScanner.find_device_by_name(target_name, timeout=timeout)

        if not device:
            raise RuntimeError(f"Device with name '{target_name}' not found.")

        print(f"Found device: {device.name} ({device.address})")

        last_err = None
        for attempt in range(self._connect_retries):
            try:
                self._client = BleakClient(device)
                await self._client.connect()
                print(f"Connected to {device.name}")
                return
            except Exception as e:
                print(
                    f"Connection attempt {attempt+1}/{self._connect_retries} failed: {e}")
                last_err = e
                await asyncio.sleep(self._connect_retry_delay_s)

        raise RuntimeError(f"Failed to connect to {target_name}") from last_err

    async def connect_to_tracker(self, timeout: float = 5.0) -> None:
        """
        Directly connect to the motion tracker.

        Parameters
        ----------
        timeout: float
            How long to scan for the device, in seconds.
        """

        await self._connect_to_device_by_name(self.TRACKER_NAME, timeout)

    async def connect_to_stimulator(self, timeout: float = 5.0) -> None:
        """
        Directly connect to the stimulator.

        Parameters
        ----------
        timeout: float
            How long to scan for the device, in seconds.       


        """
        await self._connect_to_device_by_name(self.STIMULATOR_NAME, timeout)

    async def disconnect(self) -> None:
        """
        Disconnect from the BLE device if currently connected.

        """
        if self._client and self._client.is_connected:
            await self._client.disconnect()
            print("Disconnected")

    """Helpers"""

    def _ensure_client(self) -> BleakClient:
        """
        Helper to ensure Client is there?

        Returns
        -------
        BleakClient
            The currently connected BLE client.

        Raises
        ------
        RuntimeError
            If the client is not connected.
        """
        if not self._client or not self._client.is_connected:
            raise RuntimeError("Not connected")
        return self._client

    """Device informati"""

    def get_address(self) -> str:
        """
        Return the address of the connected device.

        Returns
        -------
        client.address: str
            Device address which is already connected.
        """

        client = self._ensure_client()
        return client.address

    def get_mtu_size(self) -> int:
        """
        Return the MTU size of the active connection.

        Returns
        -------
        client.mtu_size: int
            MTU size in bytes.

        """

        client = self._ensure_client()
        return client.mtu_size

    """Protobuf Config"""
    # Union: SysConfig or StimConfig

    async def write_config(
            self,
            uuid_key: str,
            config_msg: Union[SysConfig, StimConf]) -> None:
        """
        Serialize the configuration to a string and write it to a given characteristic.

        Parameters
        ----------
        uuid_key: str
            The key used to look up the correct characteristic UUID.
        config_msg: Union[SysConfig, StimConf]
            The protobuf message to serialize and send.
        """

        client = self._ensure_client()

        char_uuid = self.STIM_CHAR if uuid_key == "stim" else self.SYS_CHAR
        data = config_msg.SerializeToString()
        print(f"data: {data.hex()}")

        print(f"Writing {len(data)} bytes to {char_uuid}")
        await client.write_gatt_char(char_uuid, data, response=True)
        print("Write OK")

    """READ Configuration"""

    async def _debug_list_read_characteristics(self):
        """
        Helper function to see all the channels (different UUIDS) from a device. Print all characteristics that support read.

        Notes
        -----
        Useful for quickly identifying read characteristics
        """
        client = self._ensure_client()
        services = client.services  # Kein await nötig!

        if not services:
            print("No GATT services found. Are you connected?")
            return

        print("\n--- GATT Characteristics with 'read' property ---")
        for service in services:
            for char in service.characteristics:
                if "read" in char.properties:
                    print(
                        f"Service {service.uuid}: {char.uuid} ({char.properties})")
        print("---------------------------------------------------\n")

    async def read_config(
            self,
            uuid_key: str) -> None:
        """
        Read configuration from the device and create a protobuf message.

        Parameters
        ----------
        uuid_key: str
            The key used to look up the correct characteristic UUID.
        """

        client = self._ensure_client()
        await self._debug_list_read_characteristics()

        char_uuid = self.STIM_CHAR if uuid_key == "stim" else self.SYS_CHAR

        data = await client.read_gatt_char(char_uuid)

        # 1) Sichtprüfung
        print(f"[DBG] Read {len(data)} bytes from {char_uuid}")
        print(f"[DBG] First 40 bytes (hex): {bytes(data)[:40].hex()}")

        payload = data[:40]
        print(f"Actual payload size: {len(payload)}")
        print(f"Actual payload {bytes(payload).hex()}")

        cls = StimConf if uuid_key == "stim" else SysConfig

        msg = cls()

        try:
            # safety: in manchen Backends kommt memoryview – nach bytes casten
            msg.ParseFromString(payload)
        except DecodeError as e:
            raise RuntimeError(
                "Could not parse protobuf from characteristic.\n"
                f"  uuid_key={uuid_key}\n"
                f"  char={char_uuid}\n"
                f"  len(data)={len(data)}\n"
                f"  head={bytes(data)[:16].hex()} ...\n"
                "Hints:\n"
                "  • Stimmt die Characteristic (read, nicht notify)?\n"
                "  • Erwartet die Firmware einen Längenprefix? (z.B. u32 + payload)\n"
                "  • Sind .proto-Versionen identisch?\n"
                "  • Hast du zuvor write_config() erfolgreich ausgeführt?\n"
            ) from e

       # msg.ParseFromString(data)

        print(msg)

    """Create StimConfig"""

    def create_stimconfig(
            self,
            trigger_ms: int = 1000,
            forward_us: int = 250,
            deadband_us: int = 100,
            period_us: int = 400000,
            intensity_uA: int = 100,
            retain_cfg: bool = False) -> StimConf:
        """
        Create a sample StimConfig Protocol Buffer message (with default values).

        Parameters
        ----------
        trigger_ms : int
            Trigger delay in milliseconds.
        forward_us : int
            Forward pulse width in microseconds.
        deadband_us : int
            Deadband duration in microseconds.
        period_us : int
            Stimulation period in microseconds.
        intensity_uA : int
            Current intensity in microamperes.
        retain_cfg : bool
            If ``True``, persist configuration on device (default False).

        Returns
        -------
        StimConf
            Populated StimConfig object
        """
        args = {
            "trigger_ms": trigger_ms,
            "forward_us": forward_us,
            "deadband_us": deadband_us,
            "period_us": period_us,
            "intensity_uA": intensity_uA,
            "retain_cfg": retain_cfg,
        }

        return StimConf(**args)

    """Create SysConfig"""

    def create_sysconfig(
            self,
            retain_cfg: bool = False,
            # IMU
            acc_fs: AccFS = AccFS.FS_4G,
            gyro_fs: GyroFS = GyroFS.FS_500DPS,
            imu_odr: int = 0,
            # MAG
            mag_odr: int = 0,
            # Dispatch
            to_ble_imu: bool = False,
            to_ble_mag: bool = False,
            to_ble_quat: bool = False,
            to_ble_log: bool = False,
            to_ble_vnsdata: bool = False,
            to_mem_imu: bool = False,
            to_mem_mag: bool = False,
            to_mem_quat: bool = False,
            to_mem_log: bool = False,
            to_mem_vnsdata: bool = False,
    ) -> SysConfig:
        """
        Create a sample SysConfig Protocol Buffer message

        Parameters
        ----------
        retain_cfg : bool, optional
            If ``True``, persist configuration on device (default: False).
        acc_fs : AccFS, optional
            Accelerometer full-scale setting.
        gyro_fs : GyroFS, optional
            Gyroscope full-scale setting.
        imu_odr : int, optional
            IMU output data rate (device-specific units).
        mag_odr : int, optional
            Magnetometer output data rate (device-specific units).
        to_ble_imu, to_ble_mag, to_ble_quat, to_ble_log, to_ble_vnsdata : bool
            Enable streaming of corresponding data over BLE.
        to_mem_imu, to_mem_mag, to_mem_quat, to_mem_log, to_mem_vnsdata : bool
            Enable logging of corresponding data to device memory.

        Returns
        -------
        SysConfig
            Populated SysConfig object

        """
        args = {
            "retain_cfg": retain_cfg,
            "imu": IMUConf(
                acc_fs=acc_fs,
                gyro_fs=gyro_fs,
                odr=imu_odr,),
            "mag": MAGConf(
                odr=mag_odr),
            "dispatch": {
                "to_ble": Dispatcher.Stream(
                    imu=to_ble_imu,
                    mag=to_ble_mag,
                    quat=to_ble_quat,
                    log=to_ble_log,
                    vnsdata=to_ble_vnsdata),
                "to_mem": Dispatcher.Stream(
                    imu=to_mem_imu,
                    mag=to_mem_mag,
                    quat=to_mem_quat,
                    log=to_mem_log,
                    vnsdata=to_mem_vnsdata),
            }
        }
        return SysConfig(**args)

    """Notification"""

    async def _debug_list_notify_characteristics(self):
        """
        Helper function to see all the channels (different UUIDS) from a device. Print all characteristics that support notifications.

        Notes
        -----
        Useful for quickly identifying streaming characteristics
        """
        client = self._ensure_client()
        services = client.services  # Kein await nötig!

        if not services:
            print("No GATT services found. Are you connected?")
            return

        print("\n--- GATT Characteristics with 'notify' property ---")
        for service in services:
            for char in service.characteristics:
                if "notify" in char.properties:
                    print(
                        f"Service {service.uuid}: {char.uuid} ({char.properties})")
        print("---------------------------------------------------\n")

    def notification_callback(self, sender, data: bytearray) -> None:
        """
        Callback for the notification function which handles the received data arrays.

        Parameters
        ----------
        sender: Any
            Argument for callback function with sender UUID
        data: bytearray
            Raw received data from device
        """

        uuid = getattr(sender, "uuid", str(sender))
        decoded = self.decode_data(data)
        print(f"[notify {uuid}] {decoded}")

    def _get_bleak_GATT_Characteristic(self) -> BleakGATTCharacteristic:
        """
        Internal helper function to get the right target UUID (visible in _debug_list_notify_characteristics)

        Returns
        -------
        ch: str
            Found UUID which matches with the target UUID

        """
        client = self._ensure_client()

        ch = next(
            (c for s in client.services for c in s.characteristics
             if c.uuid.lower() == self.DATA_UUID.lower() and "notify" in c.properties),
            None,
        )

        return ch

    async def start_notification(self) -> None:
        """
        Start the notification function
        """
        client = self._ensure_client()

        await self._debug_list_notify_characteristics()

        ch = self._get_bleak_GATT_Characteristic()
        if ch is None:
            raise ValueError(
                f"Notify characteristic {self.DATA_UUID} not found.")

        await client.start_notify(ch, self.notification_callback)
        print(f"Listening on {ch.uuid}. Press SPACE to stop…")

    async def stop_notification(self) -> None:
        """
        Stop the notification function
        """
        client = self._ensure_client()

        ch = self._get_bleak_GATT_Characteristic()
        if ch is None:
            raise ValueError(
                f"Notify characteristic {self.DATA_UUID} not found.")
        await client.stop_notify(ch)
        print("Notification has been stopped")

    """Decoder"""

    def decode_data(
            self,
            data: bytearray) -> List[List[Any]]:
        """
        Data decoder to get usable data.

        Parameters
        ----------
        data: bytearray
            Raw received data from device

        Returns
        -------
        List[List[Any]]

        """
        # Decode
        parser_imu = struct.Struct('<I6h')
        parser_mag = struct.Struct('<I3h')
        parser_quat = struct.Struct('<I4h')
        parser_vns_data = struct.Struct('<I4h')
        parser_imu_quat = struct.Struct('<I6h4h')
        parser_text_preamble = struct.Struct('<I')

        d = []
        try:
            while len(data) > 0:
                line = [None]*18  # 6 imu, 4 quat, 3 mag, 1 timestamp, 4 vns
                line[0] = 0

                sample_type = data[0]
                data = data[1:]
                if sample_type == 0:
                    sample_len = parser_imu.size
                    sample = parser_imu.unpack(data[:sample_len])
                    line[0:7] = sample
                elif sample_type == 1:
                    sample_len = parser_quat.size
                    sample = parser_quat.unpack(data[:sample_len])
                    line[0] = sample[0]
                    line[7:11] = sample[1:]
                elif sample_type == 2:
                    sample_len = parser_mag.size
                    sample = parser_mag.unpack(data[:sample_len])
                    line[0] = sample[0]
                    line[11:14] = sample[1:]
                elif sample_type == 3:
                    sample_len = parser_vns_data.size
                    sample = parser_vns_data.unpack(data[:sample_len])
                    line[0] = sample[0]
                    line[14:] = sample[1:]
                elif sample_type == 4:
                    sample_len = parser_imu_quat.size
                    sample = parser_imu_quat.unpack(data[:sample_len])
                    line[0:7] = sample[0:7]
                    # Convert quaternion to float
                    line[7:11] = map(lambda x: x / 32768.0, sample[7:11])

                elif sample_type == 66:
                    sample_len = data[0]
                    string = data[1:1+sample_len].decode('utf-8')
                d.append(line)
                data = data[sample_len:]
        except struct.error as e:
            print(f"Error unpacking data: {e}")
            print(f"Remaining data length: {len(data)}")
            if len(data) > 0:
                print(f"First few bytes of remaining data: {data[:10]}")
        finally:
            return d

__init__

__init__(address='', connect_retries=3, connect_retry_delay_s=1.0)

Parameters:

Name Type Description Default
target

General target if connection applies to motion tracker or stimulator

required
connect_retries int

Number of connection attempts before timeout (default: 3).

3
connect_retry_delay_s float

Delay in seconds between connection attempts (default: 1.0).

1.0
Source code in sdk/vnsconnect.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def __init__(
        self,
        address: str = "",
        # target: Literal["tracker", "stim"],
        connect_retries: int = 3,
        connect_retry_delay_s: float = 1.0,
):
    """
    Parameters
    ----------
    target: Literal[str, str]
        General target if connection applies to motion tracker or stimulator
    connect_retries: int
        Number of connection attempts before timeout (default: 3).
    connect_retry_delay_s: float
        Delay in seconds between connection attempts (default: 1.0).

    """

    self.address = address
    self._client: Optional[BleakClient] = None
    self._connect_retries = connect_retries
    self._connect_retry_delay_s = connect_retry_delay_s

scanner async

scanner(timeout=5.0)

Scan for all nearby BLE devices for a given duration and include RSSI data.

Returns:

Type Description
dict[str, tuple[BLEDevice, AdvertisementData]]

Dictionary of discovered devices and their advertisement data.

Source code in sdk/vnsconnect.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
async def scanner(self, timeout: float = 5.0) -> dict[str, tuple[BLEDevice, Any]]:
    """
    Scan for all nearby BLE devices for a given duration and include RSSI data.

    Returns
    -------
    dict[str, tuple[BLEDevice, AdvertisementData]]
        Dictionary of discovered devices and their advertisement data.
    """
    devices_dict = await BleakScanner.discover(timeout=timeout, return_adv=True)

    wanted = {"SmartVNS Tracker", "SmartVNS Stimulator", "Zephyr"}
    result: dict[str, tuple[BLEDevice, Any]] = {}

    for addr, (device, adv) in devices_dict.items():
        name = device.name  # or getattr(adv, "local_name", "Unknown")
        if name in wanted:
            rssi = adv.rssi
            print(f"- {name} ({device.address}) RSSI: {rssi} dBm")
            result[addr] = (device, adv)

    return result

find_device async

find_device(timeout=5.0)

Scan for a specific BLE device for a given duration by address.

Parameters:

Name Type Description Default
timeout float

Time to scan for in seconds (default: 5.0).

5.0

Returns:

Type Description
Optional[BLEDevice]

BLEDevice object if found, None otherwise.

Source code in sdk/vnsconnect.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
async def find_device(self, timeout: float = 5.0) -> Optional[BLEDevice]:
    """
    Scan for a specific BLE device for a given duration by address.

    Parameters
    ----------
    timeout: flaot
        Time to scan for in seconds (default: 5.0).

    Returns
    -------
    Optional[BLEDevice]
        BLEDevice object if found, None otherwise.
    """
    print(f"Scanning for device with address {self.address}...")
    device = await BleakScanner.find_device_by_address(self.address, timeout=timeout)

    if device is None:
        print(f"Device with address {self.address} not found")
        return None

    print(f"Found device: {device.name or 'Unknown'} ({device.address})")
    return device

connect_to_device async

connect_to_device()

Attempt to connect to the BLE device using its address.

Raises:

Type Description
RuntimeError

If the client is not connected.

Source code in sdk/vnsconnect.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
async def connect_to_device(self) -> None:
    """
    Attempt to connect to the BLE device using its address.

    Raises
    ------
    RuntimeError
        If the client is not connected.


    """
    last_err = None

    for attempt in range(self._connect_retries):
        try:
            self._client = BleakClient(self.address)
            await self._client.connect()
            print(f"Connected to {self.address}")
            return

        except Exception as e:
            print(
                f"Connection attempt {attempt}/{self._connect_retries} failed: {e}")
            last_err = e
            await asyncio.sleep(self._connect_retry_delay_s)

    raise RuntimeError(
        f"Failed to connect to {self.address}") from last_err

connect_to_tracker async

connect_to_tracker(timeout=5.0)

Directly connect to the motion tracker.

Parameters:

Name Type Description Default
timeout float

How long to scan for the device, in seconds.

5.0
Source code in sdk/vnsconnect.py
269
270
271
272
273
274
275
276
277
278
279
async def connect_to_tracker(self, timeout: float = 5.0) -> None:
    """
    Directly connect to the motion tracker.

    Parameters
    ----------
    timeout: float
        How long to scan for the device, in seconds.
    """

    await self._connect_to_device_by_name(self.TRACKER_NAME, timeout)

connect_to_stimulator async

connect_to_stimulator(timeout=5.0)

Directly connect to the stimulator.

Parameters:

Name Type Description Default
timeout float

How long to scan for the device, in seconds.

5.0
Source code in sdk/vnsconnect.py
281
282
283
284
285
286
287
288
289
290
291
292
async def connect_to_stimulator(self, timeout: float = 5.0) -> None:
    """
    Directly connect to the stimulator.

    Parameters
    ----------
    timeout: float
        How long to scan for the device, in seconds.       


    """
    await self._connect_to_device_by_name(self.STIMULATOR_NAME, timeout)

disconnect async

disconnect()

Disconnect from the BLE device if currently connected.

Source code in sdk/vnsconnect.py
294
295
296
297
298
299
300
301
async def disconnect(self) -> None:
    """
    Disconnect from the BLE device if currently connected.

    """
    if self._client and self._client.is_connected:
        await self._client.disconnect()
        print("Disconnected")

get_address

get_address()

Return the address of the connected device.

Returns:

Type Description
client.address: str

Device address which is already connected.

Source code in sdk/vnsconnect.py
325
326
327
328
329
330
331
332
333
334
335
336
def get_address(self) -> str:
    """
    Return the address of the connected device.

    Returns
    -------
    client.address: str
        Device address which is already connected.
    """

    client = self._ensure_client()
    return client.address

get_mtu_size

get_mtu_size()

Return the MTU size of the active connection.

Returns:

Type Description
client.mtu_size: int

MTU size in bytes.

Source code in sdk/vnsconnect.py
338
339
340
341
342
343
344
345
346
347
348
349
350
def get_mtu_size(self) -> int:
    """
    Return the MTU size of the active connection.

    Returns
    -------
    client.mtu_size: int
        MTU size in bytes.

    """

    client = self._ensure_client()
    return client.mtu_size

write_config async

write_config(uuid_key, config_msg)

Serialize the configuration to a string and write it to a given characteristic.

Parameters:

Name Type Description Default
uuid_key str

The key used to look up the correct characteristic UUID.

required
config_msg Union[SysConfig, StimConf]

The protobuf message to serialize and send.

required
Source code in sdk/vnsconnect.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
async def write_config(
        self,
        uuid_key: str,
        config_msg: Union[SysConfig, StimConf]) -> None:
    """
    Serialize the configuration to a string and write it to a given characteristic.

    Parameters
    ----------
    uuid_key: str
        The key used to look up the correct characteristic UUID.
    config_msg: Union[SysConfig, StimConf]
        The protobuf message to serialize and send.
    """

    client = self._ensure_client()

    char_uuid = self.STIM_CHAR if uuid_key == "stim" else self.SYS_CHAR
    data = config_msg.SerializeToString()
    print(f"data: {data.hex()}")

    print(f"Writing {len(data)} bytes to {char_uuid}")
    await client.write_gatt_char(char_uuid, data, response=True)
    print("Write OK")

read_config async

read_config(uuid_key)

Read configuration from the device and create a protobuf message.

Parameters:

Name Type Description Default
uuid_key str

The key used to look up the correct characteristic UUID.

required
Source code in sdk/vnsconnect.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
async def read_config(
        self,
        uuid_key: str) -> None:
    """
    Read configuration from the device and create a protobuf message.

    Parameters
    ----------
    uuid_key: str
        The key used to look up the correct characteristic UUID.
    """

    client = self._ensure_client()
    await self._debug_list_read_characteristics()

    char_uuid = self.STIM_CHAR if uuid_key == "stim" else self.SYS_CHAR

    data = await client.read_gatt_char(char_uuid)

    # 1) Sichtprüfung
    print(f"[DBG] Read {len(data)} bytes from {char_uuid}")
    print(f"[DBG] First 40 bytes (hex): {bytes(data)[:40].hex()}")

    payload = data[:40]
    print(f"Actual payload size: {len(payload)}")
    print(f"Actual payload {bytes(payload).hex()}")

    cls = StimConf if uuid_key == "stim" else SysConfig

    msg = cls()

    try:
        # safety: in manchen Backends kommt memoryview – nach bytes casten
        msg.ParseFromString(payload)
    except DecodeError as e:
        raise RuntimeError(
            "Could not parse protobuf from characteristic.\n"
            f"  uuid_key={uuid_key}\n"
            f"  char={char_uuid}\n"
            f"  len(data)={len(data)}\n"
            f"  head={bytes(data)[:16].hex()} ...\n"
            "Hints:\n"
            "  • Stimmt die Characteristic (read, nicht notify)?\n"
            "  • Erwartet die Firmware einen Längenprefix? (z.B. u32 + payload)\n"
            "  • Sind .proto-Versionen identisch?\n"
            "  • Hast du zuvor write_config() erfolgreich ausgeführt?\n"
        ) from e

   # msg.ParseFromString(data)

    print(msg)

create_stimconfig

create_stimconfig(trigger_ms=1000, forward_us=250, deadband_us=100, period_us=400000, intensity_uA=100, retain_cfg=False)

Create a sample StimConfig Protocol Buffer message (with default values).

Parameters:

Name Type Description Default
trigger_ms int

Trigger delay in milliseconds.

1000
forward_us int

Forward pulse width in microseconds.

250
deadband_us int

Deadband duration in microseconds.

100
period_us int

Stimulation period in microseconds.

400000
intensity_uA int

Current intensity in microamperes.

100
retain_cfg bool

If True, persist configuration on device (default False).

False

Returns:

Type Description
StimConf

Populated StimConfig object

Source code in sdk/vnsconnect.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def create_stimconfig(
        self,
        trigger_ms: int = 1000,
        forward_us: int = 250,
        deadband_us: int = 100,
        period_us: int = 400000,
        intensity_uA: int = 100,
        retain_cfg: bool = False) -> StimConf:
    """
    Create a sample StimConfig Protocol Buffer message (with default values).

    Parameters
    ----------
    trigger_ms : int
        Trigger delay in milliseconds.
    forward_us : int
        Forward pulse width in microseconds.
    deadband_us : int
        Deadband duration in microseconds.
    period_us : int
        Stimulation period in microseconds.
    intensity_uA : int
        Current intensity in microamperes.
    retain_cfg : bool
        If ``True``, persist configuration on device (default False).

    Returns
    -------
    StimConf
        Populated StimConfig object
    """
    args = {
        "trigger_ms": trigger_ms,
        "forward_us": forward_us,
        "deadband_us": deadband_us,
        "period_us": period_us,
        "intensity_uA": intensity_uA,
        "retain_cfg": retain_cfg,
    }

    return StimConf(**args)

create_sysconfig

create_sysconfig(retain_cfg=False, acc_fs=AccFS.FS_4G, gyro_fs=GyroFS.FS_500DPS, imu_odr=0, mag_odr=0, to_ble_imu=False, to_ble_mag=False, to_ble_quat=False, to_ble_log=False, to_ble_vnsdata=False, to_mem_imu=False, to_mem_mag=False, to_mem_quat=False, to_mem_log=False, to_mem_vnsdata=False)

Create a sample SysConfig Protocol Buffer message

Parameters:

Name Type Description Default
retain_cfg bool

If True, persist configuration on device (default: False).

False
acc_fs AccFS

Accelerometer full-scale setting.

FS_4G
gyro_fs GyroFS

Gyroscope full-scale setting.

FS_500DPS
imu_odr int

IMU output data rate (device-specific units).

0
mag_odr int

Magnetometer output data rate (device-specific units).

0
to_ble_imu bool

Enable streaming of corresponding data over BLE.

False
to_ble_mag bool

Enable streaming of corresponding data over BLE.

False
to_ble_quat bool

Enable streaming of corresponding data over BLE.

False
to_ble_log bool

Enable streaming of corresponding data over BLE.

False
to_ble_vnsdata bool

Enable streaming of corresponding data over BLE.

False
to_mem_imu bool

Enable logging of corresponding data to device memory.

False
to_mem_mag bool

Enable logging of corresponding data to device memory.

False
to_mem_quat bool

Enable logging of corresponding data to device memory.

False
to_mem_log bool

Enable logging of corresponding data to device memory.

False
to_mem_vnsdata bool

Enable logging of corresponding data to device memory.

False

Returns:

Type Description
SysConfig

Populated SysConfig object

Source code in sdk/vnsconnect.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def create_sysconfig(
        self,
        retain_cfg: bool = False,
        # IMU
        acc_fs: AccFS = AccFS.FS_4G,
        gyro_fs: GyroFS = GyroFS.FS_500DPS,
        imu_odr: int = 0,
        # MAG
        mag_odr: int = 0,
        # Dispatch
        to_ble_imu: bool = False,
        to_ble_mag: bool = False,
        to_ble_quat: bool = False,
        to_ble_log: bool = False,
        to_ble_vnsdata: bool = False,
        to_mem_imu: bool = False,
        to_mem_mag: bool = False,
        to_mem_quat: bool = False,
        to_mem_log: bool = False,
        to_mem_vnsdata: bool = False,
) -> SysConfig:
    """
    Create a sample SysConfig Protocol Buffer message

    Parameters
    ----------
    retain_cfg : bool, optional
        If ``True``, persist configuration on device (default: False).
    acc_fs : AccFS, optional
        Accelerometer full-scale setting.
    gyro_fs : GyroFS, optional
        Gyroscope full-scale setting.
    imu_odr : int, optional
        IMU output data rate (device-specific units).
    mag_odr : int, optional
        Magnetometer output data rate (device-specific units).
    to_ble_imu, to_ble_mag, to_ble_quat, to_ble_log, to_ble_vnsdata : bool
        Enable streaming of corresponding data over BLE.
    to_mem_imu, to_mem_mag, to_mem_quat, to_mem_log, to_mem_vnsdata : bool
        Enable logging of corresponding data to device memory.

    Returns
    -------
    SysConfig
        Populated SysConfig object

    """
    args = {
        "retain_cfg": retain_cfg,
        "imu": IMUConf(
            acc_fs=acc_fs,
            gyro_fs=gyro_fs,
            odr=imu_odr,),
        "mag": MAGConf(
            odr=mag_odr),
        "dispatch": {
            "to_ble": Dispatcher.Stream(
                imu=to_ble_imu,
                mag=to_ble_mag,
                quat=to_ble_quat,
                log=to_ble_log,
                vnsdata=to_ble_vnsdata),
            "to_mem": Dispatcher.Stream(
                imu=to_mem_imu,
                mag=to_mem_mag,
                quat=to_mem_quat,
                log=to_mem_log,
                vnsdata=to_mem_vnsdata),
        }
    }
    return SysConfig(**args)

notification_callback

notification_callback(sender, data)

Callback for the notification function which handles the received data arrays.

Parameters:

Name Type Description Default
sender

Argument for callback function with sender UUID

required
data bytearray

Raw received data from device

required
Source code in sdk/vnsconnect.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def notification_callback(self, sender, data: bytearray) -> None:
    """
    Callback for the notification function which handles the received data arrays.

    Parameters
    ----------
    sender: Any
        Argument for callback function with sender UUID
    data: bytearray
        Raw received data from device
    """

    uuid = getattr(sender, "uuid", str(sender))
    decoded = self.decode_data(data)
    print(f"[notify {uuid}] {decoded}")

start_notification async

start_notification()

Start the notification function

Source code in sdk/vnsconnect.py
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
async def start_notification(self) -> None:
    """
    Start the notification function
    """
    client = self._ensure_client()

    await self._debug_list_notify_characteristics()

    ch = self._get_bleak_GATT_Characteristic()
    if ch is None:
        raise ValueError(
            f"Notify characteristic {self.DATA_UUID} not found.")

    await client.start_notify(ch, self.notification_callback)
    print(f"Listening on {ch.uuid}. Press SPACE to stop…")

stop_notification async

stop_notification()

Stop the notification function

Source code in sdk/vnsconnect.py
652
653
654
655
656
657
658
659
660
661
662
663
async def stop_notification(self) -> None:
    """
    Stop the notification function
    """
    client = self._ensure_client()

    ch = self._get_bleak_GATT_Characteristic()
    if ch is None:
        raise ValueError(
            f"Notify characteristic {self.DATA_UUID} not found.")
    await client.stop_notify(ch)
    print("Notification has been stopped")

decode_data

decode_data(data)

Data decoder to get usable data.

Parameters:

Name Type Description Default
data bytearray

Raw received data from device

required

Returns:

Type Description
List[List[Any]]
Source code in sdk/vnsconnect.py
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def decode_data(
        self,
        data: bytearray) -> List[List[Any]]:
    """
    Data decoder to get usable data.

    Parameters
    ----------
    data: bytearray
        Raw received data from device

    Returns
    -------
    List[List[Any]]

    """
    # Decode
    parser_imu = struct.Struct('<I6h')
    parser_mag = struct.Struct('<I3h')
    parser_quat = struct.Struct('<I4h')
    parser_vns_data = struct.Struct('<I4h')
    parser_imu_quat = struct.Struct('<I6h4h')
    parser_text_preamble = struct.Struct('<I')

    d = []
    try:
        while len(data) > 0:
            line = [None]*18  # 6 imu, 4 quat, 3 mag, 1 timestamp, 4 vns
            line[0] = 0

            sample_type = data[0]
            data = data[1:]
            if sample_type == 0:
                sample_len = parser_imu.size
                sample = parser_imu.unpack(data[:sample_len])
                line[0:7] = sample
            elif sample_type == 1:
                sample_len = parser_quat.size
                sample = parser_quat.unpack(data[:sample_len])
                line[0] = sample[0]
                line[7:11] = sample[1:]
            elif sample_type == 2:
                sample_len = parser_mag.size
                sample = parser_mag.unpack(data[:sample_len])
                line[0] = sample[0]
                line[11:14] = sample[1:]
            elif sample_type == 3:
                sample_len = parser_vns_data.size
                sample = parser_vns_data.unpack(data[:sample_len])
                line[0] = sample[0]
                line[14:] = sample[1:]
            elif sample_type == 4:
                sample_len = parser_imu_quat.size
                sample = parser_imu_quat.unpack(data[:sample_len])
                line[0:7] = sample[0:7]
                # Convert quaternion to float
                line[7:11] = map(lambda x: x / 32768.0, sample[7:11])

            elif sample_type == 66:
                sample_len = data[0]
                string = data[1:1+sample_len].decode('utf-8')
            d.append(line)
            data = data[sample_len:]
    except struct.error as e:
        print(f"Error unpacking data: {e}")
        print(f"Remaining data length: {len(data)}")
        if len(data) > 0:
            print(f"First few bytes of remaining data: {data[:10]}")
    finally:
        return d