ICMP通信

今回は、以前作成しました【チェックサム】のモジュールを用いて、簡易的な「エコー要求 / エコー応答」を作成しました。



ICMPとは

ネットワークを経由して、サーバ、ネットワーク機器、パソコンとの通信状況を確認するために用いられるプロトコルです。
ping コマンド や traceroute(tracert) コマンドに用いられています。
※メッセージフォーマットについては、RFC792を参照

通信イメージ図

メッセージ

◇ICMP

ICMPメッセージ

・Type:ICMPメッセージ機能
(ICMP要求:8 、ICMP応答:0)

・Code:ICMPメッセージ機能
(ICMP 要求 / 応答:0)

・チェックサム:ICMPメッセージの改ざん検知
(「チェックサム 計算式」を参照)

・ID (identifier):ICMP 要求 / 応答の識別番号
・Sequence Number:ICMP 要求 / 応答のシーケンス番号

機器情報とソースコード

◇機器

・ノートPC(Windows10 & Python3.11.5を導入済み)

◇ソースコード(Python)

https://github.com/tango3304/tping/tree/main

検証

◇実行するソースコード

※2023/10/29(日) 改修:送受信時間を追加

# Coding: UTF-8
from tping.tping import PingSocket
from time import sleep
from itertools import repeat
from scapy.layers.inet import IP
from traceback import format_exception_only
from sys import exc_info


# ReceivePacket Anarysis Function[受信パケット解析関数]
def receivepacket_analysis(ipaddress, timedout):
    try:
    # Get ReceivePacket [受信パケット取得]
        receive_ping_packet, timestamp = PingSocket(ipaddress, timedout).ping_socket()

        if receive_ping_packet == 'Timeout' and timestamp == None:
            print(f'Request Timed out [要求がタイムアウトしました]')
        else:
        # ReceivePacket Analysis [受信パケット解析]
            ip_packet_data = IP(receive_ping_packet)
            source = ip_packet_data.src
            ttl = ip_packet_data.ttl
            ping_data = len(ip_packet_data.load)
            sleep(1)
            print(f'{source} Response: Byte ={ping_data} Time ={timestamp}s TTL={ttl}')
    
    except KeyboardInterrupt:
    # (Ctrl + c) Process [(Ctrl + c) の処理]
        print(f'\nProcess Interrupted [処理を中断しました]')
        exit(1)
    except:
    # Get ErrorMessage [エラーメッセージ取得]
        error_type, error_message, _ = exc_info()
        error_message = ''.join(i for i in format_exception_only(error_type, error_message))
        print(f'{error_message}')
        exit(1)


# Main Process [メイン処理]
if __name__ == "__main__":
# Initialization [初期化]
    ipaddress = None
    default_timedout = 3
    repeat_count = 4
    flag = 0
    send_timestamp = None
    receive_timestamp = None

# PING Setting [PING設定]
try:
    ipaddress = str(input('\n\n IPaddress: '))
    repeat_count = str(input(' CommnicationCount(def 4): '))
    if repeat_count == '':
        repeat_count = 4
    if repeat_count == 't':
        flag = 1
    else:
        repeat_count = int(repeat_count)
except KeyboardInterrupt:
    print(f'\nProcess Interrupted[処理を中断しました]')
    exit(1)
except ValueError:
    print(f'数字 or 半角t を指定してください。')
    exit(1)

# PING Communication START [PING疎通開始]
print(f'{ipaddress}に ping を送信しています :')


if flag == 1:
# Communication Confirmation Permanently[永続的に疎通確認]    
    try:
        for i in repeat(None):
            receivepacket_analysis(ipaddress, default_timedout)
    except KeyboardInterrupt:
        print(f'\nProcess Interrupted [処理を中断しました]')
        exit(1)
else:
# Communication Confirmation Designation Number of times [指定回数分疎通確認]
    try:
        for i in repeat(None, repeat_count):
            receivepacket_analysis(ipaddress, default_timedout)
    except KeyboardInterrupt:
        print(f'\nProcess Interrupted[処理を中断しました]')
        exit(1)


◇実行結果

「IPadress:」:宛先IPアドレス
「CommnicationCount(def 4):」:疎通回数(デフォルトで4回)

・通常の疎通

疎通テスト1

・疎通回数を指定

疎通テスト2

・全角で入力

疎通テスト3

・永続的に疎通

疎通テスト4

・疎通できない

疎通テスト5


概要欄

・チェックサム 計算式
https://note.com/tango9512357/n/nb8916ed1e0c3