Skip to content

TFLite

edgemark.models.platforms.TFLite.TFLite_converter

basic_convert

basic_convert(model_path)

Convert the model without any optimizations.

Parameters:

Name Type Description Default
model_path str

The path to the TensorFlow model.

required

Returns:

Type Description
bytes

The TFLite model.

Source code in edgemark/models/platforms/TFLite/TFLite_converter.py
175
176
177
178
179
180
181
182
183
184
185
186
187
def basic_convert(model_path):
    """
    Convert the model without any optimizations.

    Args:
        model_path (str): The path to the TensorFlow model.

    Returns:
        bytes: The TFLite model.
    """
    converter = tf.lite.TFLiteConverter.from_saved_model(model_path)
    tflite_quant_model = converter.convert()
    return tflite_quant_model

cluster

cluster(model_path, n_clusters)

Cluster the model.

Parameters:

Name Type Description Default
model_path str

The path to the TensorFlow model.

required
n_clusters int

The number of clusters.

required

Returns:

Type Description
Model

The clustered model.

Source code in edgemark/models/platforms/TFLite/TFLite_converter.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def cluster(model_path, n_clusters):
    """
    Cluster the model.

    Args:
        model_path (str): The path to the TensorFlow model.
        n_clusters (int): The number of clusters.

    Returns:
        tf.keras.Model: The clustered model.
    """
    model = tf.keras.models.load_model(model_path)

    clustered_model = tfmot.clustering.keras.cluster_weights(model, number_of_clusters=n_clusters)

    stripped_model = tfmot.clustering.keras.strip_clustering(clustered_model)

    return stripped_model

main

main(cfg_path=config_file_path, **kwargs)

Convert the TensorFlow models to TFLite models with the specified optimizations.

Parameters:

Name Type Description Default
cfg_path str

The path to the configuration file. The configuration file that this path points to should contain the following keys: - model_base_dir (str): A placeholder for the model base directory. This will be populated by the target directory. - linkers_dir (str): Path to the directory where the generated models list is loaded from and the converted models list will be saved. - tf_model_path (str): Path to the TensorFlow model. - representative_data_path (str): Path to the representative data for quantization. The file should be a numpy file. - tflite_save_dir (str): Path to the directory to save the converted models and their assets. - conversion_timeout (float): The timeout for each conversion in seconds. - optimizations (list): The optimizations to apply during conversion. In each element/group, the optimizations should be separated by '+'.

config_file_path
**kwargs dict

Keyword arguments to be passed to the configuration file.

{}

Returns:

Type Description
list

A list of dictionaries containing the following keys for each target model: - dir (str): The directory of the target model. - flavors (list): A list of dictionaries containing the following keys for each optimization: - flavor (str): The optimization flavor. - result (str): The result of the conversion. It can be either "success" or "failed". - error (str): The error message in case of failure. - traceback (str): The traceback in case of failure. Either this or 'exception_file' will be present. - exception_file (str): The path to the exception file in case of failure. Either this or 'traceback' will be present.

Source code in edgemark/models/platforms/TFLite/TFLite_converter.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
def main(cfg_path=config_file_path, **kwargs):
    """
    Convert the TensorFlow models to TFLite models with the specified optimizations.

    Args:
        cfg_path (str): The path to the configuration file.
            The configuration file that this path points to should contain the following keys:
                - model_base_dir (str): A placeholder for the model base directory. This will be populated by the target directory.
                - linkers_dir (str): Path to the directory where the generated models list is loaded from and the converted models list will be saved.
                - tf_model_path (str): Path to the TensorFlow model.
                - representative_data_path (str): Path to the representative data for quantization. The file should be a numpy file.
                - tflite_save_dir (str): Path to the directory to save the converted models and their assets.
                - conversion_timeout (float): The timeout for each conversion in seconds.
                - optimizations (list): The optimizations to apply during conversion. In each element/group, the optimizations should be separated by '+'.
        **kwargs (dict): Keyword arguments to be passed to the configuration file.

    Returns:
        list: A list of dictionaries containing the following keys for each target model:
            - dir (str): The directory of the target model.
            - flavors (list): A list of dictionaries containing the following keys for each optimization:
                - flavor (str): The optimization flavor.
                - result (str): The result of the conversion. It can be either "success" or "failed".
                - error (str): The error message in case of failure.
                - traceback (str): The traceback in case of failure. Either this or 'exception_file' will be present.
                - exception_file (str): The path to the exception file in case of failure. Either this or 'traceback' will be present.
    """
    cfg = OmegaConf.load(cfg_path, **kwargs)
    cfg.update(OmegaConf.create(kwargs))

    targets = OmegaConf.load(os.path.join(cfg.linkers_dir, "tf_generated_models_list.yaml"))
    converted_models_list = []
    output = [{"dir": target_dir, "flavors": []} for target_dir in targets]

    for i, target_dir in enumerate(targets):
        cfg.model_base_dir = target_dir
        print("Converting the model in: {} ({}/{})".format(cfg.tf_model_path, i+1, len(targets)))
        print("Saving to: {}".format(cfg.tflite_save_dir))

        conversions_list = []
        model_path = cfg.tf_model_path
        saving_root = cfg.tflite_save_dir
        opt_groups = cfg.optimizations
        representative_data_path = cfg.representative_data_path
        representative_data = None

        finishers = ["basic", "q_dynamic", "q_full_int", "q_full_int_only", "q_16x8", "q_16x8_int_only", "q_float16"]

        # sanity checks
        for opt_group in opt_groups:
            try:
                opts = [opt.strip() for opt in opt_group.split('+')]

                if "q_full_int" in opts or "q_full_int_only" in opts or "q_16x8" in opts or "q_16x8_int_only" in opts:
                    if not os.path.exists(representative_data_path):
                        raise FileNotFoundError("representative_dataset_path is required for full_int, full_int_only, 16x8, and 16x8_int_only optimizations, but {} does not exist.".format(representative_data_path))
                    representative_data = np.load(representative_data_path)

                for opt in opts[:-1]:
                    if opt in finishers:
                        raise ValueError("The {} optimization should be the last one in its group ({}).".format(opt, opt_group))

            except Exception as e:
                output[i]["flavors"].append({
                    "flavor": opt_group,
                    "result": "failed",
                    "error": type(e).__name__,
                    "traceback": traceback.format_exc()
                })
                print("Error:")
                print(traceback.format_exc())
                opt_groups.remove(opt_group)

        progress_bar = tqdm(total=len(opt_groups), desc="TFLite conversion", colour='green', bar_format=progress_bar_format, leave=True)
        errors = []
        convertion_messages = ""

        for opt_group in opt_groups:
            try:
                opts = [opt.strip() for opt in opt_group.split('+')]

                if opts[-1] not in finishers:
                    opts.append("basic")

                conversion_funcs = []
                for i, opt in enumerate(opts):
                    if opt == "basic":
                        conversion_funcs.append({"func": basic_convert, "args": ()})
                    elif opt == "q_dynamic":
                        conversion_funcs.append({"func": quantize, "args": ()})
                    elif opt == "q_full_int":
                        conversion_funcs.append({"func": quantize, "args": (representative_data,)})
                    elif opt == "q_full_int_only":
                        conversion_funcs.append({"func": quantize, "args": (representative_data,), "kwargs": {"int_only": True}})
                    elif opt == "q_16x8":
                        conversion_funcs.append({"func": quantize, "args": (representative_data,), "kwargs": {"a16_w8": True}})
                    elif opt == "q_16x8_int_only":
                        conversion_funcs.append({"func": quantize, "args": (representative_data,), "kwargs": {"a16_w8": True, "int_only": True}})
                    elif opt == "q_float16":
                        conversion_funcs.append({"func": quantize, "args": (), "kwargs": {"float16": True}})
                    elif opt.startswith("p_") and opt[2:].isdigit():
                        target_sparsity = float(opt[2:]) / 100
                        assert 0 <= target_sparsity <= 1, "Sparsity should be between 0 and 1."
                        conversion_funcs.append({"func": prune, "args": (target_sparsity,)})
                    elif opt.startswith("c_") and opt[2:].isdigit():
                        n_clusters = int(opt[2:])
                        assert n_clusters > 0, "Number of clusters should be greater than 0."
                        conversion_funcs.append({"func": cluster, "args": (n_clusters,)})
                    else:
                        raise ValueError("Unknown optimization: {}".format(opt))

                progress_bar.set_postfix_str("{}".format(opt_group))
                save_dir = os.path.join(saving_root, opt_group)
                convert_and_save_args = (save_dir, model_path, conversion_funcs)

                process_success, process_result = _run_as_process(cfg.conversion_timeout, _run_in_silence, _convert_and_save, *convert_and_save_args)

                if process_success:
                    convertion_success, _ = process_result.result
                else:
                    convertion_success = False
                    failure_report = "Process failed.\nResult: {}\nException: {}\nTimed out: {}\n".format(process_result.result, process_result.exception, process_result.timed_out)
                    if process_result.result is None and process_result.exception is None and process_result.timed_out is False:
                        failure_report += "Possible crash of the process\n"
                    _save_tflite_exception(failure_report, os.path.join(saving_root, opt_group), delete_model=True)

                if convertion_success:
                    conversions_list.append(opt_group)
                    _, message = process_result.result
                    convertion_messages += "="*80 + "\n" + "{}:\n".format(opt_group) + message + "\n\n"

                    output[i]["flavors"].append({
                        "flavor": opt_group,
                        "result": "success"
                    })

                else:
                    convertion_messages += "="*80 + "\n" + "{}:\n".format(opt_group) + "Conversion failed\n\n"
                    errors.append(opt_group)

                    output[i]["flavors"].append({
                        "flavor": opt_group,
                        "result": "failed",
                        "error": "Conversion failed",
                        "exception_file": os.path.join(saving_root, opt_group, 'exception.txt')
                    })

                progress_bar.update(1)

            except Exception as e:
                errors.append(opt_group)
                convertion_messages += "="*80 + "\n" + "{}:\n".format(opt_group) + traceback.format_exc() + "\n\n"

                output[i]["flavors"].append({
                    "flavor": opt_group,
                    "result": "failed",
                    "error": type(e).__name__,
                    "traceback": traceback.format_exc()
                })

                progress_bar.update(1)

        progress_bar.set_postfix_str("Done")
        progress_bar.close()

        converted_models_element = {
            "model_base_dir": cfg.model_base_dir,
            "conversions": conversions_list
        }
        converted_models_list.append(converted_models_element)

        # Save the conversion messages
        with open(os.path.join(saving_root, 'conversion_messages.txt'), 'w') as f:
            f.write(convertion_messages)

        if len(errors) > 0:
            print("Errors:")
            for error in errors:
                print("    " + error)
            print("Please check the exception files for details.")

        print()

    # Save the list of converted models
    os.makedirs(cfg.linkers_dir, exist_ok=True)
    with open(os.path.join(cfg.linkers_dir, 'tflite_converted_models_list.yaml'), 'w') as f:
        yaml.dump(converted_models_list, f, indent=4, sort_keys=False)

    return output

prune

prune(model_path, target_sparsity)

Prune the model.

Parameters:

Name Type Description Default
model_path str

The path to the TensorFlow model.

required
target_sparsity float

The target sparsity.

required

Returns:

Type Description
Model

The pruned model.

Source code in edgemark/models/platforms/TFLite/TFLite_converter.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def prune(model_path, target_sparsity):
    """
    Prune the model.

    Args:
        model_path (str): The path to the TensorFlow model.
        target_sparsity (float): The target sparsity.

    Returns:
        tf.keras.Model: The pruned model.
    """
    model = tf.keras.models.load_model(model_path)

    pruning_schedule = tfmot.sparsity.keras.ConstantSparsity(target_sparsity=target_sparsity, begin_step=0, frequency=1)
    model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, pruning_schedule=pruning_schedule)

    train_x = np.random.rand(1, *model.input.shape[1:])
    train_y = np.random.rand(1, *model.output.shape[1:])

    callbacks = [tfmot.sparsity.keras.UpdatePruningStep()]
    opt = tf.keras.optimizers.SGD(learning_rate=0)

    model_for_pruning.compile(optimizer=opt, loss='mse')
    model_for_pruning.fit(train_x, train_y, epochs=1, callbacks=callbacks, verbose=0)

    stripped_model = tfmot.sparsity.keras.strip_pruning(model_for_pruning)

    return stripped_model

quantize

quantize(model_path, representative_data=None, int_only=False, a16_w8=False, float16=False)

Quantize the model.

Parameters:

Name Type Description Default
model_path str

The path to the TensorFlow model.

required
representative_data ndarray

The representative data for quantization.

None
int_only bool

Whether to force the model to use integer quantization only.

False
a16_w8 bool

Whether to quantize the model to use 16-bit activations and 8-bit weights.

False
float16 bool

Whether to quantize the model to use 16-bit floating point numbers.

False

Returns:

Type Description
bytes

The TFLite model.

Source code in edgemark/models/platforms/TFLite/TFLite_converter.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def quantize(model_path, representative_data=None, int_only=False, a16_w8=False, float16=False):
    """
    Quantize the model.

    Args:
        model_path (str): The path to the TensorFlow model.
        representative_data (numpy.ndarray): The representative data for quantization.
        int_only (bool): Whether to force the model to use integer quantization only.
        a16_w8 (bool): Whether to quantize the model to use 16-bit activations and 8-bit weights.
        float16 (bool): Whether to quantize the model to use 16-bit floating point numbers.

    Returns:
        bytes: The TFLite model.
    """
    # sanity check
    if float16 and ((representative_data is not None) or int_only or a16_w8):
        print("Warning: float16 came with int_only, a16_w8, or representative_data. Are you sure you want to do this?")
    if int_only and (representative_data is None):
        print("Warning: int_only requires representative_data. You will receive an error.")
    if a16_w8 and representative_data is None and int_only is False and float16 is False:
        print("Warning: Just setting a16_w8 is possible but not common. Are you sure you want to do this?")

    converter = tf.lite.TFLiteConverter.from_saved_model(model_path)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    if representative_data is not None:
        def representative_dataset():
            for sample in representative_data:
                    sample = np.expand_dims(sample, axis=0)     # batch_size = 1
                    yield [sample]      # set sample as the first (and only) input of the model
        converter.representative_dataset = representative_dataset

    if int_only:
        if a16_w8:
            converter.inference_input_type = tf.int16
            converter.inference_output_type = tf.int16
        else:
            converter.inference_input_type = tf.int8
            converter.inference_output_type = tf.int8

    if a16_w8:
        if int_only:
            converter.target_spec.supported_ops = [tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8]
        else:
            converter.target_spec.supported_ops = [tf.lite.OpsSet.EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8, tf.lite.OpsSet.TFLITE_BUILTINS]
    else:
        if int_only:
            converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
        else:
            pass

    if float16:
        converter.target_spec.supported_types = [tf.float16]

    tflite_quant_model = converter.convert()
    return tflite_quant_model

save_random_eqcheck_data

save_random_eqcheck_data(model_path, n_samples, save_path)

Saves a random eqcheck data as {"data_x", "data_y_pred"}.

Parameters:

Name Type Description Default
model_path str

The path to the TFLite model.

required
n_samples int

The number of samples to generate.

required
save_path str

The npz file path to save the data.

required
Source code in edgemark/models/platforms/TFLite/TFLite_converter.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def save_random_eqcheck_data(model_path, n_samples, save_path):
    """
    Saves a random eqcheck data as {"data_x", "data_y_pred"}.

    Args:
        model_path (str): The path to the TFLite model.
        n_samples (int): The number of samples to generate.
        save_path (str): The npz file path to save the data.
    """
    interpreter = tf.lite.Interpreter(model_path=model_path)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    in_scale, in_zero_point = input_details[0]['quantization']
    out_scale, out_zero_point = output_details[0]['quantization']

    rng = np.random.RandomState(42)
    data_x = rng.rand(n_samples, *input_details[0]['shape'][1:]).astype(np.float32)

    if input_details[0]['dtype'] is np.float32 and output_details[0]['dtype'] is np.float32:
        pass
    elif input_details[0]['dtype'] is np.int16 and output_details[0]['dtype'] is np.int16:
        data_x = (data_x / in_scale) + in_zero_point
        data_x = np.clip(data_x, -32768, 32767).astype(np.int16)
    elif input_details[0]['dtype'] is np.int8 and output_details[0]['dtype'] is np.int8:
        data_x = (data_x / in_scale) + in_zero_point
        data_x = np.clip(data_x, -128, 127).astype(np.int8)
    elif input_details[0]['dtype'] is np.uint8 and output_details[0]['dtype'] is np.uint8:
        data_x = (data_x / in_scale) + in_zero_point
        data_x = np.clip(data_x, 0, 255).astype(np.uint8)
    else:
        raise ValueError("Unknown input and output types: {} and {}".format(input_details[0]['dtype'], output_details[0]['dtype']))

    data_y_pred = []
    for i in range(n_samples):
        interpreter.set_tensor(input_details[0]['index'], data_x[i:i+1])
        interpreter.invoke()
        sample_y_pred = interpreter.get_tensor(output_details[0]['index'])

        if input_details[0]['dtype'] is np.float32 and output_details[0]['dtype'] is np.float32:
            pass
        elif input_details[0]['dtype'] is np.int16 and output_details[0]['dtype'] is np.int16:
            sample_y_pred = sample_y_pred.astype(np.float32)
            sample_y_pred = (sample_y_pred - out_zero_point) * out_scale
        elif input_details[0]['dtype'] is np.int8 and output_details[0]['dtype'] is np.int8:
            sample_y_pred = sample_y_pred.astype(np.float32)
            sample_y_pred = (sample_y_pred - out_zero_point) * out_scale
        elif input_details[0]['dtype'] is np.uint8 and output_details[0]['dtype'] is np.uint8:
            sample_y_pred = sample_y_pred.astype(np.float32)
            sample_y_pred = (sample_y_pred - out_zero_point) * out_scale
        else:
            raise ValueError("Unknown input and output types: {} and {}".format(input_details[0]['dtype'], output_details[0]['dtype']))

        data_y_pred.append(sample_y_pred)

    data_y_pred = np.array(data_y_pred)

    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    np.savez(save_path, data_x=data_x, data_y_pred=data_y_pred)