#!/usr/bin/env python3 """ MeshCore Network Scanner Scans the local network for MeshCore nodes by testing TCP connections on port 5000 and attempting to connect to each discovered host. Usage: python scan_meshcore_network.py [network] [port] python scan_meshcore_network.py # Scan 192.168.1.0/24 python scan_meshcore_network.py 192.168.50.0/24 # Scan specific network python scan_meshcore_network.py 192.168.1.0/24 5000 # Scan with custom port """ import asyncio import sys import ipaddress import socket import time from datetime import datetime from concurrent.futures import ThreadPoolExecutor import subprocess # Import meshcore from PyPI import meshcore from meshcore import EventType class MeshCoreNetworkScanner: def __init__(self, network: str = "192.168.1.0/24", port: int = 5000, timeout: float = 2.0): self.network = network self.port = port self.timeout = timeout self.found_nodes = [] def get_local_network(self): """Try to detect the local network automatically""" try: # Get default gateway and infer network result = subprocess.run(['route', '-n', 'get', 'default'], capture_output=True, text=True, timeout=5) if result.returncode == 0: for line in result.stdout.split('\n'): if 'gateway:' in line: gateway = line.split(':')[1].strip() # Convert gateway to network (assume /24) ip = ipaddress.IPv4Address(gateway) network = ipaddress.IPv4Network(f"{ip}/24", strict=False) return str(network) except: pass # Fallback to common networks return "192.168.1.0/24" def scan_port(self, host: str) -> bool: """Check if port is open on host (synchronous)""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) result = sock.connect_ex((host, self.port)) sock.close() return result == 0 except: return False async def test_meshcore_node(self, host: str) -> dict: """Test if a host is a real MeshCore node""" try: meshcore_client = await asyncio.wait_for( meshcore.MeshCore.create_tcp(host, self.port, debug=False), timeout=5.0 ) if meshcore_client.is_connected: device_info = meshcore_client.self_info or {} # Test device responsiveness try: result = await asyncio.wait_for( meshcore_client.commands.send_device_query(), timeout=3.0 ) responsive = result and hasattr(result, 'type') and result.type != EventType.ERROR except: responsive = False await meshcore_client.disconnect() return { 'host': host, 'port': self.port, 'is_meshcore': True, 'device_info': device_info, 'responsive': responsive, 'name': device_info.get('name', 'Unknown'), 'public_key': device_info.get('public_key', 'Unknown') } else: return {'host': host, 'port': self.port, 'is_meshcore': False} except asyncio.TimeoutError: return {'host': host, 'port': self.port, 'is_meshcore': False, 'error': 'timeout'} except Exception as e: return {'host': host, 'port': self.port, 'is_meshcore': False, 'error': str(e)} async def scan_network(self): """Scan the network for MeshCore nodes""" print(f"๐Ÿ” Scanning network: {self.network}") print(f"๐ŸŽฏ Target port: {self.port}") print(f"โฑ๏ธ Timeout: {self.timeout}s per host") print("=" * 60) # Generate list of IPs to scan try: network = ipaddress.IPv4Network(self.network, strict=False) hosts = [str(ip) for ip in network.hosts()] except ValueError as e: print(f"โŒ Invalid network: {e}") return [] print(f"๐Ÿ“ก Scanning {len(hosts)} hosts...") print() # First pass: Quick port scan to find open ports print("๐Ÿ” Phase 1: Port scanning...") open_hosts = [] with ThreadPoolExecutor(max_workers=50) as executor: futures = [executor.submit(self.scan_port, host) for host in hosts] for i, future in enumerate(futures): host = hosts[i] if future.result(): open_hosts.append(host) print(f" โœ… {host}:{self.port} - Port open") if not open_hosts: print("โŒ No open ports found") return [] print(f"\n๐Ÿ” Phase 2: Testing {len(open_hosts)} hosts for MeshCore nodes...") print() # Second pass: Test each open host for MeshCore meshcore_nodes = [] for host in open_hosts: print(f"๐Ÿงช Testing {host}:{self.port}...", end=" ") result = await self.test_meshcore_node(host) if result.get('is_meshcore'): meshcore_nodes.append(result) name = result.get('name', 'Unknown') responsive = "โœ…" if result.get('responsive') else "โš ๏ธ" print(f"โœ… MeshCore node found! {responsive} {name}") else: error = result.get('error', 'not meshcore') print(f"โŒ {error}") return meshcore_nodes def print_results(self, nodes): """Print scan results""" print("\n" + "=" * 60) print("๐Ÿ“Š SCAN RESULTS") print("=" * 60) if not nodes: print("โŒ No MeshCore nodes found on the network") return print(f"๐ŸŽ‰ Found {len(nodes)} MeshCore node(s):") print() for i, node in enumerate(nodes, 1): print(f"๐Ÿ“ก Node {i}: {node['host']}:{node['port']}") print(f" Name: {node.get('name', 'Unknown')}") print(f" Public Key: {node.get('public_key', 'Unknown')[:16]}...") print(f" Responsive: {'โœ… Yes' if node.get('responsive') else 'โš ๏ธ No'}") device_info = node.get('device_info', {}) if device_info: print(f" Device Info:") for key, value in device_info.items(): if key not in ['name', 'public_key']: print(f" {key}: {value}") print() async def main(): """Main scanner function""" # Parse command line arguments network = "192.168.1.0/24" # Default port = 5000 # Default MeshCore port if len(sys.argv) > 1: network = sys.argv[1] if len(sys.argv) > 2: port = int(sys.argv[2]) # Auto-detect network if not specified if network == "192.168.1.0/24" and len(sys.argv) == 1: scanner = MeshCoreNetworkScanner() detected_network = scanner.get_local_network() if detected_network != "192.168.1.0/24": print(f"๐Ÿ” Auto-detected network: {detected_network}") network = detected_network print(f"MeshCore Network Scanner") print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print() # Create scanner and run scanner = MeshCoreNetworkScanner(network, port) nodes = await scanner.scan_network() scanner.print_results(nodes) return len(nodes) if __name__ == "__main__": try: node_count = asyncio.run(main()) sys.exit(0 if node_count > 0 else 1) except KeyboardInterrupt: print("\n\nโน๏ธ Scan interrupted by user") sys.exit(1) except Exception as e: print(f"\nโŒ Scanner error: {e}") sys.exit(1)