FT601を使った通信システムを設計してみよう(その2)

前回「FT601を使った通信システムを設計してみよう(その1)」の続きです。今回はこのFPGAで作ったUSBデバイスを制御するホストPCのソフトウェアについて解説します。ハードとソフトの両方が揃わないと通信システムの開発評価、機能拡張はできません。簡単で小規模のもので良いのでハードもソフトも触れると開発意欲がわきますよ。

FT600/601を制御するデバイスドライバと開発リソース
C++、C#やPythonを使ったサンプルプログラム、そしてFT600/601にアクセするためのDLL(D3XX DRIVER)は、こちらのFTDI社のWebサイト(FT600/601 Software Examples)に公開されています。
 D3XX ドライバーのページを参照して、自身のOS環境にあったデバイスドライバーをダウンロードして自身のPCにインストールします。    

FT600/601コンフィグレーションプログラマ
手に入れたFT600またはFT601のコンフィグレーションが自分の意図した状態ではないかもしれません。しかしこのツールを使えばこのデバイスのコンフィグレーション(=動作モードですね。)を自分の意図する状態に変更できます。FT600やFT601のファームウェアを書き換えるツールだと思ってください。
ここからダウンロードできます。(FT60X Series Configuration Toolというセクションを見てください。)

PythonでFT600/601と通信してみるには
Windows 上で FT600 関連の Python プログラムを実行するには上述のデバイスドライバーを用いることができます。FT600/601が正しくWindowsに認識されていればデバイスマネージャーでは次のように確認できます。

Python3がインストールされいないならば、Python3をインストールします。Python3がインストールされているならば、Python用のftd3xxライブラリをインストールします。コマンドプロンプトやPower Shellを開いて、次のコマンド実行します。

python -m pip install ftd3xx

続いて先にダウンロードしたFTD3XX.DLLをPythonの実行環境にコピーします。筆者のPython環境はAnacondaを用いていおり、Python.exeは、C:\Users\****\anaconda3 にあるので(****:ユーザー名です)おこにFTD3XX.DLLを置きます。みなさんの環境に合わせてDLLを配置してください。
続いて、Pythonのコードでは、冒頭でライブラリをインポートします。

import ftd3xx

これでDLLやライブラリを使用する準備が整いました。ライブラリの使い方を示した資料は見つかりませんでしたが、サンプルプログラムや、C++/C#のプログラミングガイドを参照することで、理解できます。見よう見まねでやってみましょう。サンプルプログラムはそれなりのコード量になっていて、全部理解しようとすると心が折れそうになるので、Open(またはCreate)やClose, WriteやRead を名前に含む関数(だいたいそんな名前になってますよね)を見つけて、概要をつかみましょう。プログラムが持つべき最低限の機能が実現できれば良いのですから。特に今回のシステムは固定長データを送信する、固定長データを受信することができれば良いですね。あとは送信する固定長データのどこにどんな数値を入れればUSBデバイス側の特定のハードにアクセスできるか、受信した固定長データの中のどこを見れば良いか、それはアドレスマップを定義したみなさんが良く知っているはずですので、その仕様に基づき、固定長データの中を確認しましょう。

不細工な設計ですが、例を示します。これでも十分USBデバイス(すなわちFPGAのコード)側が正しく動作しているかのチェックを行うことができます。今回作ったUSBデバイスは数個のLEDとスイッチが接続されているだけですが、狙ったLEDを点灯させることができますし、所定のスイッチがON/OFFされる様子を受信データの中をのぞき見することで確認できます。コードは適当に改造してください。

import ftd3xx
import sys
import numpy as np
from random import randint
import logging
import time

FT_OPEN_BY_SERIAL_NUMBER = 0x00000001
FT_OPEN_BY_DESCRIPTION = 0x00000002
FT_OPEN_BY_LOCATION = 0x00000004
FT_OPEN_BY_GUID = 0x00000008
FT_OPEN_BY_INDEX = 0x00000010


def information_fifo_mode(num):
    # The following defines refer to "FTD3XX.h"
    fifo_mode_list = ["CONFIGURATION_FIFO_MODE_245", "CONFIGURATION_FIFO_MODE_600",
                      "CONFIGURATION_MODE_COUNT"]
    return fifo_mode_list[num]


def information_channel_config(num):
    # The following defines refer to "FTD3XX.h"
    channel_config_list = ["CONFIGURATION_CHANNEL_CONFIG_4", "CONFIGURATION_CHANNEL_CONFIG_2",
                           "CONFIGURATION_CHANNEL_CONFIG_1", "CONFIGURATION_CHANNEL_CONFIG_1_OUTPIPE",
                           "CONFIGURATION_CHANNEL_CONFIG_1_INPIPE", "CONFIGURATION_CHANNEL_CONFIG_COUNT"]
    return channel_config_list[num]


def open_ft_usb_device(device_type, device_name):
    ascii_device_name = bytes(device_name, encoding="ASCII")
    for device_id in range(16):
        usb_object = ftd3xx.create(device_id, FT_OPEN_BY_INDEX)
        if sys.platform == 'win32' and usb_object.getDriverVersion() < 0x01020006:
            usb_object.close()
            return None, 'D3XX driver version is old. Please update driver!'
        if usb_object.getDeviceInfo()['Description'] != ascii_device_name:
            usb_object.close()
            continue
        return usb_object, 'Successfully opened %s USB device: %s' % (device_type, device_name)
    return None, 'open usb device failed!'


def close_ft_usb_device(usb_object):
    usb_object.close()


def get_random_data(data_length):
    data_array = [randint(0, 255) for i in range(data_length)]
    return bytes(data_array), len(data_array)


def get_incremental_data(data_length):
    # repeat = int(data_length / 256)
    data_array = []
    # for i in range(repeat):
    #     for j in range(256):
    #         data_array.append(j)
    # data_array = [j for i in range(repeat) for j in range(256)]
    # return bytes(data_array), len(data_array)
    size_of_int_data = int(data_length / 4)
    data_array = np.arange(0, size_of_int_data, dtype=np.int32)
    return bytes(data_array), len(data_array)


def get_register_test_data(data_length, index, write_data_hex):
    size_of_int_data = int(data_length / 4)
    data_array = np.zeros(size_of_int_data, dtype=np.int32)
    data_array[index] = write_data_hex
    return bytes(data_array), len(data_array)


def WritePipe(D3XX, pipe, buffer, size):
    return D3XX.writePipe(pipe, buffer, size)


def ReadPipe(D3XX, pipe, size):
    received = D3XX.readPipeEx(pipe, size, True)
    return received['bytesTransferred'], received['bytes']


def upstream_test(usb_handle, option, inv):
    send_data_length = 4096
    if inv:
        data = 0xAA
    else:
        data = 0x55

    if option == 3:
        send_data, length = get_incremental_data(send_data_length)
    else:
        send_data, length = get_register_test_data(send_data_length, 1, data)

    transferred_value = WritePipe(usb_handle, 0x02, send_data, send_data_length)
    if transferred_value != send_data_length:
        # close_ft_usb_device(usb)
        print("WritePipe failed")
        return
    print("WritePipe Successful")
    return


def downstream_test(usb_handle):
    receive_data_length = 4096
    value, receive_data = ReadPipe(usb_handle, 0x82, receive_data_length)
    if value == receive_data_length:
        f_receive = open("receive_data.dat", "wb")
        f_receive.write(receive_data)
        print("ReadPipe Successful, receive data saved")
    return


def downstream_test_3x(usb_handle, turn):
    receive_data_length = 4096
    value_list = []
    receive_data_list = []
    result = [False, False, False]
    for i in range(turn):
        value, receive_data = ReadPipe(usb_handle, 0x82, receive_data_length)
        value_list.append(value)
        receive_data_list.append(receive_data)
        filename_prefix = 'receive_data_'
        filename_number = str(i)
        filename_suffix = '.dat'
        filename = filename_prefix + filename_number + filename_suffix
        if value_list[i] == receive_data_length:
            f_receive = open(filename, "wb")
            f_receive.write(receive_data_list[i])
            result[i] = True
        else:
            print("ReadPipe Failed")
            return
    if all(result):
        print("ReadPipe Successful, receive data saved")
    else:
        print("ReadPipe Failed")
    return


def loop_back_test(usb_handle, option):
    send_data_length = 4096
    receive_data_length = 4096
    flg = False
    if option == 1:
        data_type = "random"
        send_data, length = get_random_data(send_data_length)
    else:
        data_type = "incremental"
        send_data, length = get_incremental_data(send_data_length)
    # start_write = time.perf_counter()
    transferred_value = WritePipe(usb_handle, 0x02, send_data, send_data_length)
    # end_write = time.perf_counter() - start_write
    # print(f'{end_write}秒!')
    if transferred_value != send_data_length:
        close_ft_usb_device(usb)
        print("WritePipe failed")
        return
    else:
        # start_read = time.perf_counter()
        value, receive_data = ReadPipe(usb_handle, 0x82, receive_data_length)
        # end_read = time.perf_counter() - start_read
        # print(f'{end_read}秒!')
        if value == receive_data_length:
            flg = True
        if send_data != receive_data:
            print(" ** send_data and receive_data mismatch")
            return
        else:
            print(" send_data and receive_data match, loopback test passed **")

    if flg:
        f_send = open("send_data_" + data_type + ".dat", "wb")
        f_send.write(send_data)
        f_receive = open("receive_data_" + data_type + ".dat", "wb")
        f_receive.write(receive_data)


def main(handle):
    inversion = True
    while True:
        try:
            print('***** 1: loopback_test_random')
            print('***** 2: loopback_test_incremental')
            print('***** 3: upstream_test_incremental')
            print('***** 4: upstream_test_FD0004')
            print('***** 5: downstream_test_4K')
            print('***** 6: downstream_test_4K x 3 turn')
            print('***** 0: end test')
            cmd = input('command input >> ')
        except KeyboardInterrupt:
            break

        cmd = int(cmd)
        if cmd == 0:
            close_ft_usb_device(handle)
            break
        elif cmd == 1 or cmd == 2:
            loop_back_test(handle, cmd)
        elif cmd == 3 or cmd == 4:
            upstream_test(handle, cmd, inversion)
        elif cmd == 5:
            downstream_test(handle)
        elif cmd == 6:
            downstream_test_3x(handle, 3)
        else:
            print("invalid command")

        inversion = not inversion


if __name__ == '__main__':
    usb, message = open_ft_usb_device("FT60X", "FTDI SuperSpeed-FIFO Bridge")
    print(message)
    print("VendorID : 0x%X" % usb.getChipConfiguration().VendorID)
    print("ProductID : 0x%X" % usb.getChipConfiguration().ProductID)
    print("ChannelConfig : %s" % information_channel_config(usb.getChipConfiguration().ChannelConfig))
    print("FIFOMode : %s" % information_fifo_mode(usb.getChipConfiguration().FIFOMode))
    print("--------------------------------------------------------------------------")
    print("------------------------  Loop back test    ------------------------------")
    main(usb)
    # loop_back_test(1)

Visual C#でFT600/601と通信してみるには
任意のレジスタの値をもうちょっと楽に変えたい、USBデバイス側の様子(レジスタの中身)をリアルタイムに参照したい、つまりGUIを作りたいとなりました。PythonにもGUIを作る開発ツールが存在しますが、全く使ったことがないので、てっとり早くC#で作ることにしました。

使用するデバイスドライバは上述のと同じです。FTD3XX.DLLも同じ物を使います。(置き場所は異なります。C#で作ったソフトウェアの実行環境に配置してください。)ただし、.NET環境のプログラミングとなるため、FTD3XX_NET.dllというラッパークラスのライブラリも同じ場所に配置します。
Visutal C#でプロジェクトを作ったら、そのプロジェクトの参照マネージャーによりFTD3XX_NET.dllを参照するように設定します。これでみなさんのC#アプリからFTD3XX_NET.dllを通じてFTD3XX.DLLにアクセスし、デバドラを通じてFT600/601を使ったUSBデバイスを通信が行えるようになります。

このようなGUIのアプリとしました。

FT601のデバドラのOpen/Closeが行え、USBデバイスとして認識された際に取得するデバイスディスクリプターやコンフィグレーションデータを取得して表示します。画面左下はUSBデバイスへの送信データを設定するエリアでDataGridViewを用いて作っています。これによりアドレスマップの仕様どおりにFPGA側が作れているか確認することができます。値を編集した後に Send Dataボタンを押すことで、この値を反映した4KB固定データがUSBデバイスへ1回送信されます。画面右下はUSBデバイスからの受信データを表示できます。こちらもDataGridViewを使って作成しています。先に述べたとおり12KBデータを受信して先頭の4KBは捨てて残り8KBの中身を表示します。Receiving Dataボタンを押して1回受信する機能と、画面最下部に設けた Periodic receive Start / Stop ボタンによって、定周期の繰り返し受信する・止める機能を設けました。受信周期は100m、200ms、500ms、1000msの4種選べるようにしました。
プロジェクトの構成は次のとおりです。

  • FT601_TEST.cs   ーー GUIを担います。
  • CData.cs      ーー GUIとのデータ送受、データのシリアライズやデシリアライズを行います。
  • CAddress.cs    ーー DataGridViewのアドレスの列に表示するアドレスデータを定義しています。 
  • CWorkerThread.cs  ーー FTD3XX_NET.dllのAPIを扱い、FT601とやりとりするデータ通信本体です。

単純な作りでコード量は少ないですが、解説するとすればCWorkerThread.csでしょうか。CWorkerThread.csでは、定的な受信処理を行うメソッドをスレッド化し、受信したデータをメインスレッドのフォーム(FT601_TEST)が持つグローバル変数にコピーした後で、PostMessageによりメインスレッドを伝えます。それを受けてメインスレッド側では受信下データを受信データ用のDataGridViewに反映します。

プロジェクト一式を掲載します。開発環境はVisual Studio 2017 です。

PCとPC、あるいはPCの接続先にはOSが搭載されているような場合は、前回「FT601を使った通信システムを設計してみよう(その1)」から始まるのような低レイヤーの設計を行う機会は無いかもしれません。今回紹介した設計は、筆者がこれまでに関わったCPUやFPGAを持つ装置間でバイナリデータをやりとりする通信システムで使われている設計です。それを単純化した仕様にして例を示しました。(シンプルな仕様を考え、自宅でゼロから再設計してみた。自由にハード・ソフトの仕様決めて良いってストレスフリーで楽ちんですねー)