Introduction
IOTA devices provide a RESTful API which allows direct access to the data stored on the device. This can be useful for integrating into various scenarios. In this case, to filter for blacklisted IPs in the currently captured content. Use cases may be to analyze if certain internal IPs are not propagated beyond a specific access level, or if certain devices are being accessed from an area where they should not be accessed from.
Prerequisites
-
Basic knowledge of Python programming.
-
Access to an IOTA device from Profitap.
Setting up the IOTA device
1. Connect the IOTA device
-
Connect the IOTA device to the network segment you want to monitor.
-
Identifying the correct segment is critical here. Ensure, with a visual inspection of the traffic on the IOTA device, that the correct segment is being used during capture.
2. Access the IOTA web interface
-
Open a web browser and enter the IP address of the IOTA device.
-
Example: https://<iota_device_ip_address> (Replace with the actual IP address of your IOTA device).
-
-
Log in using the correct credentials. This script uses a username/password combination that should allow it to see the necessary information.
3. Configure capture settings
-
Set up the network interfaces for packet capture.
-
Choose the interfaces you want to monitor from the device's web interface.
-
-
Configure filters if needed to focus on specific traffic types.
-
You can set filters to capture only specific types of traffic, such as HTTP or FTP, depending on your needs.
-
Using the IOTA database query engine
The query engine allows any user to interact with the IOTA device in a machine-readable fashion to query the metadata database. This is done using a RESTful API access with a username/password combination.
To ensure that sensitive information is not propagated and that no information on the device can be manipulated by the user, it is recommended to create a new special user. In this case, a user has been created with the username “apitest”. Note that this user has been automatically added to the organization as a “Viewer”. Nothing else needs to be configured, and this role will prevent any miscommunication by only allowing read-only access.
Security considerations
The solution used here is for creating a dedicated user/password combination for using the script. As it is recommended to protect direct access to the IOTA from any outside interference and to use a HTTPS connection, this is fine. However, please note that it is a good practice to ensure that the user/password combination is not used in any other context and the user has only a Viewer role (which a new user has by default).
Python script for blacklisted IP detection
The script:
- iota_ip_blacklist_check.py
-
import requests
import re
import time
import urllib.parse
import json
import argparse
from urllib3.exceptions import InsecureRequestWarning
def main():
# CLI options
parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.')
required_args = parser.add_argument_group('required')
required_args.add_argument('-d', '--device_ip', help='Device IP address')
required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)')
parser.add_argument('-u', '--device_username', default='admin', help='Device IP username (default: admin)')
parser.add_argument('-p', '--device_password', default='admin', help='Device IP password (default: admin)')
parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)')
args = parser.parse_args()
if args.device_ip:
iota_ip_address = args.device_ip
if args.device_username:
iota_username = args.device_username
if args.device_password:
iota_password = args.device_password
if args.query_time_window_s:
query_time_window_s = args.query_time_window_s
if args.infile:
blacklist_files = args.infile
query_time_window_end = int(time.time())
query_time_window_start = query_time_window_end - query_time_window_s
results = {}
session = requests.Session()
session.auth = (iota_username, iota_password)
# Suppress warnings from urllib3
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Open blacklist files, iterate over them
for blacklist_file in blacklist_files:
count_fail = 0
print(f' - {blacklist_file}')
with open(blacklist_file) as f:
for line in f:
# Skip comments or empty lines
if re.match(r"^[;#\s]", line):
continue
# Current IP address or subnet
ip = line.split()[0]
print(f' - {ip:45}', end='')
# Handle CIDR subnet query
if re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2})|(?:[0-9a-fA-F\:]{2,39}/[0-9]{1,2}))$", ip):
query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
matching_flows = json.loads(response.content)['data'][0]['matching_flows']
if matching_flows == "0":
print('Ok')
else:
count_fail += 1
print(f'Match (flows: {matching_flows})')
# Handle single IP address query
elif re.match(r"^(?:(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F\:]{2,39}))$", ip):
query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (IP_SRC = '{ip}' OR IP_DST = '{ip}') AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
matching_flows = json.loads(response.content)['data'][0]['matching_flows']
if matching_flows == "0":
print('Ok')
else:
count_fail += 1
print(f'Match (flows: {matching_flows})')
# Some unknown case
else:
print('Unhandled case')
# Record results after completing file
results[blacklist_file] = count_fail
# Print results
print('\nResults\n===')
for file in results:
print(f' - {file:47}{str(results[file])} matches')
if __name__ == '__main__':
main()
Step-by-step script explanation
1. Import necessary libraries
import requests
import re
import time
import urllib.parse
import json
import argparse
from urllib3.exceptions import InsecureRequestWarning
2. Set up configuration
# CLI options
parser = argparse.ArgumentParser(description='Compare IP blacklist files against IOTA metadata. Blacklist files must contain a list of single IPv4/IPv6 addresses or IP subnet in CIDR notation.')
required_args = parser.add_argument_group('required')
required_args.add_argument('-d', '--device_ip', help='Device IP address')
required_args.add_argument('-i', '--infile', nargs='+', help = 'Blacklist file(s)')
parser.add_argument('-u', '--device_username', default='apitest', help='Device IP username (default: apitest)')
parser.add_argument('-p', '--device_password', default='apitest', help='Device IP password (default: apitest)')
parser.add_argument('-l', '--query_time_window_s', type=int, default=600, help='Number of seconds from the current time to query IOTA metadata (default: 600)')
-
-d / –device_ip
The IP of the IOTA device. -
-i / –infile
A text file with IP addresses to look for. CIDR subnet queries for address ranges may also be provided. The file may contain comments prefixed with either a ; or #. Any empty line or lines started with spaces will be omitted. By providing this parameter multiple times, multiple input files may be handed over to this script. -
-u / –device_username
The username defined in the step above. -
-p / –device_password
The password defined in the step above. -
-l / –query_time_window_s
How much time in the past should the query occur. The default is 600 seconds, which corresponds to 10 minutes.
The input file:
As an example, here is an input file, which will search for a specific subnet as well as all the IP addresses for the internal DNS server.
- infile.txt
-
# IP Address of DNS server
192.168.1.250
# Host range for internal testing
10.40.0.0/24
3. Main logic to detect blacklisted IPs
The main logic opens the blacklist file, parses it step by step, and queries the internal database for either a corresponding subnet query or a direct IP address query.
query = urllib.parse.quote(f"SELECT COUNT(DISTINCT FLOW_ID) AS matching_flows FROM flows.base WHERE (isIPAddressInRange(IP_SRC, '{ip}') OR isIPAddressInRange(IP_DST, '{ip}')) AND DATE_PKT_MS_FIRST >= toDateTime64({query_time_window_start}, 3) AND DATE_PKT_MS_FIRST <= toDateTime64({query_time_window_end}, 3) FORMAT JSON", safe='()*')
response = session.get('https://'+iota_ip_address+'/api/datasources/proxy/3/?query='+query, verify=False)
matching_flows = json.loads(response.content)['data'][0]['matching_flows']
First, the query is set up. In this case, we only need to look for the number of entries found, which is why we are performing a COUNT operation. The database is queried in SQL format.
After the query has been created, a session is set up, which will use an HTTP GET operation to query the information. The result is presented as JSON data, and the first entry is being read.
For each IP address found, the number of matching entries is printed, and for each file, the total number of matching entries is printed.
Running the script
It is recommended that the script is run in a Python environment. This can be done with the following commands:
python3 -m venv ./Development/iota
source ./Development/iota/bin/activate
python3 -m pip install argparse requests
This will create a Python3 venv environment (see here for more information) and install the required libraries that are not present in a default environment.
Then, assuming the Python script, as well as the infile file, is in the current directory, the following will run the script:
In this case, the IOTA was positioned to capture on the outbound WAN connection. In the last 10 minutes, no outbound DNS query was done from the internal DNS server, but 4 times, the internal test network sent data over the WAN connection.
In this example, the script is being used as a whitelist test, but it can easily be configured as a blacklist test as well.