AFT AK COE β€” Vendor Integration Reference

TAK Ecosystem Integration Guide

Complete reference for vendors building on ATAK, WinTAK, iTAK, WebTAK, and TAK Server. Use the interactive decision tree to find your integration path, then follow the full guide below to implement it.

14
Guide sections
30+
Integration paths
50+
Code examples
4
TAK clients covered
🌳

Interactive Integration Decision Tree

Answer two questions β€” what are you building, and what does it need to do β€” and get the exact integration method, complexity, and guide section references. Then scroll down to that section for full implementation details.

Integration Decision Tree

1

TAK Ecosystem Overview

The Team Awareness Kit (TAK) is a geospatial situational awareness platform originally developed by AFRL and now managed by the TAK Product Center, Department of Defense. It provides a Common Operating Picture (COP) across Android (ATAK), iOS (iTAK), Windows (WinTAK), and browser (WebTAK) clients synchronized through TAK Server.

Modern TAK deployments are sophisticated data pipelines where AI models, IoT sensors, external databases, video analytics engines, and enterprise systems must not only push data into TAK but also receive events, decisions, and triggers back from the TAK ecosystem. This bidirectional architecture is the focus of this guide.

1.1 Core Platform Components

ComponentDescription
ATAKAndroid Team Awareness Kit β€” primary mobile client; supports plugins, CoT, video, data packages
iTAKiOS Team Awareness Kit β€” Apple device client with core ATAK feature parity
WinTAKWindows TAK client β€” desktop C2 platform with .NET plugin architecture
WebTAKBrowser-based TAK client β€” JavaScript/OpenLayers; accessible from any device
TAK ServerCentral CoT broker, MARTI data services, certificate authority, mission management
FreeTAKServerOpen-source community TAK Server (Python); REST API, easier for dev/test
ATAK SDKOfficial Java/Android SDK for building ATAK plugins; distributed via tak.gov
WinTAK SDKC#/.NET SDK for building WinTAK plugins
TAK Video ServerRTSP/RTMP proxy and transcoder purpose-built for TAK video feeds

1.2 Data Flow Patterns

PatternDescriptionCommon Use Cases
Inbound OnlyExternal system sends data to TAK. TAK displays it.GPS trackers, weather feeds, AIS ship data, static overlays
Outbound OnlyApplication reads current TAK state.Dashboards, reporting, logging, asset monitoring
BidirectionalExternal system sends data to TAK AND receives events/triggers back.AI/ML pipelines, camera analytics, IoT command-and-control
Event-DrivenTAK generates a specific event that wakes up an external system.Automated alerts, AI inference requests, workflow triggers

1.3 Integration Method Selection Matrix

Use CaseBest MethodBidirectional?
Real-time asset trackingCoT TCP/UDP streamOptional
Live camera feed displayRTSP via Data Package or Video ServerYes β€” analytics back to TAK
Static boundary/zone overlayKML/KMZ file importNo
Updating overlay (zones, weather)KML Network LinkNo
Custom operator UI/toolATAK Plugin (APK)Yes β€” full API access
Enterprise data (ArcGIS, SQL, API)Middleware bridge β†’ CoT or MARTIYes
AI/LLM inference on TAK eventsPlugin + Webhook/REST callbackYes β€” critical use case
IoT sensor command and controlMQTT + CoT + REST callbackYes
Offline basemapGeoPackage / MBTilesNo
Hardware device (GPS, radio)Serial-to-CoT bridgeOptional
Video analytics with TAK markersVideo Server + AI + CoT dispatchYes β€” full pipeline
Multi-server federationTAK Server federation linkYes β€” bidirectional sync
2

Cursor on Target (CoT) β€” Complete Reference

Cursor on Target is the lingua franca of the TAK ecosystem. Every position report, alert, shape, tasking, and sensor reading travels as a CoT XML event. Mastering CoT is the foundation of every integration path in this guide.

2.1 Complete CoT XML Structure

XML β€” fully annotated CoT event with all common fields
<?xml version="1.0" encoding="UTF-8"?>
<event version="2.0"
       uid="org.example.vehicle-unit-42"
       type="a-f-G-E-V"
       how="m-g"
       time="2026-01-01T14:30:00.000Z"
       start="2026-01-01T14:30:00.000Z"
       stale="2026-01-01T14:35:00.000Z">
  <!-- WGS-84: hae=height above ellipsoid(m), ce=circular error(m) -->
  <point lat="38.8977" lon="-77.0365" hae="25.4" ce="5" le="10"/>
  <detail>
    <contact callsign="BRAVO-7" endpoint="*:-1:stcp"/>
    <__group name="Blue" role="Team Member"/>
    <status battery="87"/>
    <track speed="12.5" course="270.0"/>    <!-- m/s, degrees true -->
    <remarks>Proceeding to checkpoint DELTA. ETA 14:45Z.</remarks>
    <sensor type="optical" status="active" zoom="4x"/>
    <color argb="-65536"/>   <!-- red -->
    <link uid="org.example.company-alpha" type="a-f-G-U-C"
          relation="p-p" remarks="Parent unit"/>
  </detail>
</event>

2.2 CoT Type System

The type attribute hierarchy: a-[affiliation]-[battle dimension]-[function]

CoT TypeMeaning
a-f-G-U-CFriendly Ground Unit Combat
a-f-G-E-VFriendly Ground Equipment Vehicle
a-f-A-C-FFriendly Air Fixed-wing Fighter
a-f-A-M-HFriendly Air Rotary-wing Helicopter
a-f-S-S-SFriendly Sea Surface Submarine
a-h-GHostile Ground (generic)
a-u-GUnknown Ground (generic)
a-n-GNeutral Ground
b-m-p-s-mBit Map Point Sensor Marker
b-t-fGeoChat Message
u-d-rDrawing Rectangle
u-d-c-cDrawing Circle
a-u-G-E-WUnknown Ground Equipment Weather sensor
t-b-aTasking Broadcast Alert

2.3 Transport Protocols

2.3.1 TCP Streaming (Port 8087 / 8089 TLS) β€” Primary Production Protocol

Python β€” TLS TCP connection with exponential backoff
import socket, ssl, time

def connect_tls(host, port, certfile, keyfile, cafile):
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    ctx.load_cert_chain(certfile, keyfile)
    ctx.load_verify_locations(cafile)
    raw = socket.create_connection((host, port))
    return ctx.wrap_socket(raw, server_hostname=host)

def resilient_stream(host, port, cot_generator):
    delay = 1
    while True:
        try:
            sock = connect_tls(host, port,
                'client.pem', 'client.key', 'ca.pem')
            delay = 1
            for cot in cot_generator():
                sock.sendall(cot.encode('utf-8'))
        except (socket.error, ssl.SSLError) as e:
            print(f'Connection lost: {e}. Retry in {delay}s')
            time.sleep(delay)
            delay = min(delay * 2, 60)

2.3.2 UDP Multicast (LAN / Field Networks)

Python β€” UDP multicast (no TAK Server required)
import socket

MCAST_GRP  = '239.2.3.1'
MCAST_PORT = 6969

def send_multicast(cot_xml):
    sock = socket.socket(socket.AF_INET,
                         socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
    sock.sendto(cot_xml.encode(), (MCAST_GRP, MCAST_PORT))
    sock.close()

2.3.3 WebSocket (WebTAK / Custom Dashboards)

JavaScript β€” WebSocket subscribe and send to TAK Server
const ws = new WebSocket('wss://takserver:8443/takproto/1');

ws.onmessage = (ev) => {
    const xml = new DOMParser()
        .parseFromString(ev.data, 'application/xml');
    const uid = xml.querySelector('event').getAttribute('uid');
    const lat = xml.querySelector('point').getAttribute('lat');
    console.log(`${uid} is at ${lat}`);
};

ws.send(buildCotXml('my-asset', 38.8977, -77.0365));

2.4 Converting JSON API Data to CoT β€” Full Pipeline

Python β€” full JSON API to CoT streaming pipeline with TLS + retry
import requests, socket, ssl, datetime, time, logging

log = logging.getLogger('json2cot')
API_URL   = 'https://fleet.example.com/api/v2/vehicles'
API_KEY   = 'YOUR_API_KEY'
TAK_HOST  = 'takserver.example.com'
TAK_PORT  = 8089
POLL_SEC  = 15
STALE_MIN = 2

def iso(dt): return dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')

def vehicle_to_cot(v):
    now   = datetime.datetime.utcnow()
    stale = now + datetime.timedelta(minutes=STALE_MIN)
    uid   = f'fleet-{v["id"]}'
    return f'''<?xml version="1.0"?>
    <event version="2.0" uid="{uid}" type="a-f-G-E-V"
           how="m-g" time="{iso(now)}"
           start="{iso(now)}" stale="{iso(stale)}">
      <point lat="{v['location']['lat']}"
             lon="{v['location']['lng']}" hae="0" ce="10" le="10"/>
      <detail>
        <contact callsign="{v.get('name', uid)}"/>
        <track speed="{v.get('speed_ms',0)}"
               course="{v.get('heading',0)}"/>
        <remarks>Dept: {v.get('department','Unknown')}</remarks>
      </detail>
    </event>'''

def get_conn():
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode    = ssl.CERT_NONE   # use CERT_REQUIRED + CA in prod
    return ctx.wrap_socket(socket.create_connection((TAK_HOST, TAK_PORT)))

conn = get_conn()
while True:
    try:
        resp = requests.get(API_URL,
                            headers={'X-Api-Key': API_KEY}, timeout=8)
        for v in resp.json().get('vehicles', []):
            conn.sendall(vehicle_to_cot(v).encode())
        time.sleep(POLL_SEC)
    except Exception as e:
        log.error(f'Error: {e}. Reconnecting...')
        try: conn.close()
        except: pass
        time.sleep(5)
        conn = get_conn()

2.5 TAK Server Data Feed (HTTPS Polling Endpoint)

Python / Flask β€” CoT endpoint for TAK Server to poll
from flask import Flask, Response
from your_db import get_active_assets

app = Flask(__name__)

@app.route('/tak/feed.xml')
def feed():
    events = ''.join(asset_to_cot(a) for a in get_active_assets())
    xml = f'<?xml version="1.0"?><events>{events}</events>'
    return Response(xml, mimetype='application/xml')

# In TAK Server admin: Data Feeds > Add Feed
# Protocol: HTTPS, URL: https://yourserver/tak/feed.xml
# Polling interval: 15-60 seconds
3

Bidirectional Data Flows β€” The Critical Architecture

WHY THIS MATTERS
A camera server uses AI to detect an unauthorized vehicle. TAK receives the detection and creates an alert marker. Once a TAK operator dispatches a unit, that decision needs to flow back to the camera server and AI system so they can track the response and update their models. Without bidirectional flow, TAK becomes a passive display board instead of a C2 hub.

3.1 Architecture Patterns

3.1.1 Webhook / Callback Pattern (Most Common)

Python / Flask β€” webhook receiver on your external server
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/tak/callback', methods=['POST'])
def tak_callback():
    payload    = request.json
    event_type = payload.get('event_type')
    unit_uid   = payload.get('uid')
    lat        = payload.get('lat')
    lon        = payload.get('lon')
    if event_type == 'unit_dispatched':
        update_dispatch_record(unit_uid, lat, lon)
        trigger_camera_ptz(lat, lon)   # swing PTZ camera to target
        notify_supervisor(unit_uid)
    return jsonify({'status': 'received'}), 200

3.1.2 Event Bus Pattern (TAK as Message Router)

Python β€” TAK stream β†’ Kafka producer; Kafka consumer β†’ CoT
import socket, kafka, xml.etree.ElementTree as ET

producer = kafka.KafkaProducer(bootstrap_servers='kafka:9092')

def cot_to_dict(xml_str):
    root = ET.fromstring(xml_str)
    pt   = root.find('point')
    return {'uid': root.get('uid'), 'type': root.get('type'),
            'lat': pt.get('lat'), 'lon': pt.get('lon')}

with socket.create_connection(('takserver', 8087)) as s:
    buf = ''
    while True:
        data = s.recv(4096).decode()
        buf += data
        while '</event>' in buf:
            end = buf.index('</event>') + 8
            try:
                event = cot_to_dict(buf[:end])
                producer.send('tak-events', str(event).encode())
            except ET.ParseError: pass
            buf = buf[end:]

# Return path: Kafka consumer β†’ TAK
consumer = kafka.KafkaConsumer('tak-commands',
                               bootstrap_servers='kafka:9092')
tak_out  = socket.create_connection(('takserver', 8087))
for msg in consumer:
    tak_out.sendall(build_cot_from_command(msg.value).encode())

3.1.3 ATAK Plugin as Bidirectional Bridge

Java (Android) β€” ATAK plugin WebSocket bridge
public class BidirectionalBridge
    implements MapEventDispatcher.MapEventDispatchListener {
    private WebSocket ws;

    public void start() {
        mapView.getMapEventDispatcher()
               .addMapItemEventListener(MapEvent.ITEM_ADDED, this);

        ws = http.newWebSocket(
            new Request.Builder()
                .url("wss://ai-server.example.com/tak-bridge").build(),
            new WebSocketListener() {
                @Override
                public void onMessage(WebSocket ws, String text) {
                    // External server sent command β†’ dispatch as CoT
                    CotDispatcher.getExternalDispatcher()
                                 .dispatchToBroadcast(buildCotFromCommand(text));
                }
            });
    }

    @Override
    public void onMapEvent(MapEvent event) {
        if (event.getType().equals(MapEvent.ITEM_ADDED)) {
            MapItem item = event.getItem();
            JSONObject p = new JSONObject();
            p.put("uid", item.getUID());
            p.put("lat", item.getGeoPointMetaData().get().getLatitude());
            p.put("lon", item.getGeoPointMetaData().get().getLongitude());
            ws.send(p.toString());  // forward map item to external server
        }
    }
}

3.2 Receiving Data FROM TAK

3.2.1 TCP Subscribe (Real-Time Event Stream)

Python β€” subscribe to all CoT events from TAK Server
import socket, threading, xml.etree.ElementTree as ET

def listen_to_tak(host='takserver', port=8087):
    with socket.create_connection((host, port)) as s:
        buf = b''
        while True:
            chunk = s.recv(4096)
            if not chunk: break
            buf += chunk
            while b'</event>' in buf:
                end = buf.index(b'</event>') + 8
                xml = buf[:end].decode('utf-8', errors='replace')
                buf = buf[end:]
                process_incoming_cot(xml)   # AI model, DB, alerting

threading.Thread(target=listen_to_tak, daemon=True).start()

3.2.2 MARTI REST API β€” Query Current State

Python β€” MARTI REST API client queries
import requests

SERVER = 'https://takserver:8443'
CERT   = ('client.pem', 'client.key')
CA     = 'ta-ca.pem'

# All currently connected clients
resp = requests.get(f'{SERVER}/Marti/api/clients', cert=CERT, verify=CA)
for c in resp.json()['data']:
    print(c['uid'], c['callsign'], c['lastEventPt'])

# Latest CoT for a specific UID
uid  = 'vehicle-unit-42'
resp = requests.get(f'{SERVER}/Marti/api/cot/xml/{uid}',
                    cert=CERT, verify=CA)

# POST a CoT event without a persistent socket
resp = requests.post(f'{SERVER}/Marti/api/cot/xml',
                     data=cot_xml, cert=CERT, verify=CA,
                     headers={'Content-Type': 'application/xml'})

3.3 TAK Server Federation (Multi-Server Bidirectional)

Federation allows two or more TAK Server instances to share CoT events bidirectionally without merging security domains. Each server controls exactly what it shares via group policies.

USE CASE
Organization A runs emergency dispatch with ATAK. Organization B is a neighboring county with their own TAK Server. Federation lets both see shared resources (mutual aid units, incident markers) without merging their servers or security domains.
  1. In TAK Server admin: Configuration > Federation
  2. Add Federation Outgoing to remote server (hostname + port 9000)
  3. Remote server adds your server as Federation Incoming
  4. Exchange and trust each other's CA certificates
  5. Configure group filtering β€” which groups and data each side shares
  6. Test: send a CoT event from one side and verify it appears on the other

3.4 TAK Server Plugins (Server-Side Java)

Server-side plugins run inside the TAK Server process and intercept ALL CoT traffic. Deploy as JAR files via the TAK Server Plugin SDK from tak.gov. Use for centralized logging, filtering, data retention enforcement, or triggering external systems from every CoT event.

4

AI and LLM Integration β€” Intelligent TAK Pipelines

AI does not sit inside TAK β€” it sits beside TAK, connected through bidirectional mechanisms. TAK provides the geospatial operating picture; AI provides inference and intelligence that enhances that picture.

4.1 Computer Vision + Camera Feed Pipeline

Cameras feed a CV model; detections become CoT events on the TAK map; operator responses flow back to the camera system. Fully bidirectional pipeline.

Python β€” YOLOv8 β†’ CoT pipeline with bidirectional callbacks
from ultralytics import YOLO
import cv2, socket, datetime, threading, requests

model = YOLO('yolov8n.pt')
tak   = socket.create_connection(('takserver', 8087))

CAMERA_ID    = 'gate-cam-01'
CAMERA_LAT   = 38.8977
CAMERA_LON   = -77.0365
CALLBACK_URL = 'https://cameraserver.example.com/tak/callback'

def detection_to_cot(n, cls, conf, cam_lat, cam_lon):
    now   = datetime.datetime.utcnow()
    stale = now + datetime.timedelta(minutes=3)
    fmt   = '%Y-%m-%dT%H:%M:%S.000Z'
    ctype = 'a-u-G-E-V' if cls == 'car' else 'a-u-G'
    return f'''<?xml version="1.0"?>
    <event version="2.0" uid="cv-{n}" type="{ctype}"
           how="m-g" time="{now.strftime(fmt)}"
           start="{now.strftime(fmt)}" stale="{stale.strftime(fmt)}">
      <point lat="{cam_lat}" lon="{cam_lon}" hae="0" ce="50" le="50"/>
      <detail>
        <contact callsign="AI-DET:{cls.upper()}"/>
        <remarks>Conf:{conf:.2f} | Cam:{CAMERA_ID}</remarks>
        <color argb="-65536"/>
      </detail>
    </event>'''

cap = cv2.VideoCapture('rtsp://camera.local:554/stream1')
n   = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    for r in model(frame, conf=0.6):
        for box in r.boxes:
            cls, conf = model.names[int(box.cls)], float(box.conf)
            n += 1
            tak.sendall(detection_to_cot(n, cls, conf,
                                         CAMERA_LAT, CAMERA_LON).encode())
            threading.Thread(target=requests.post, kwargs={
                'url': CALLBACK_URL,
                'json': {'event':'detection','class':cls,'conf':conf}
            }).start()

4.2 LLM Integration Patterns

4.2.1 Radio Transcript β†’ CoT via LLM

Python β€” STT β†’ LLM β†’ CoT pipeline
import openai, json, socket, datetime

client   = openai.OpenAI(api_key='YOUR_KEY')
tak_sock = socket.create_connection(('takserver', 8087))

SYSTEM_PROMPT = '''You are a military/emergency communications parser.
Extract: unit_id, action, lat, lon, remarks from the transcript.
Respond ONLY as JSON. Use null for unknown fields.'''

def parse_radio(transcript):
    resp = client.chat.completions.create(
        model='gpt-4o',
        messages=[{'role':'system','content':SYSTEM_PROMPT},
                  {'role':'user','content':transcript}],
        response_format={'type':'json_object'})
    return json.loads(resp.choices[0].message.content)

def parsed_to_cot(p):
    if not p.get('lat') or not p.get('lon'): return None
    now   = datetime.datetime.utcnow()
    stale = now + datetime.timedelta(minutes=10)
    fmt   = '%Y-%m-%dT%H:%M:%S.000Z'
    return f'''<?xml version="1.0"?>
    <event version="2.0" uid="llm-{p['unit_id'].replace(' ','-')}"
           type="a-f-G-U-C" how="h-e"
           time="{now.strftime(fmt)}" start="{now.strftime(fmt)}"
           stale="{stale.strftime(fmt)}">
      <point lat="{p['lat']}" lon="{p['lon']}"
             hae="0" ce="500" le="500"/>
      <detail>
        <contact callsign="{p['unit_id']}"/>
        <remarks>AI-parsed: {p.get('remarks','')}</remarks>
      </detail>
    </event>'''

4.2.2 LLM SITREP Generation from TAK Events

Python β€” TAK subscribe β†’ LLM β†’ GeoChat SITREP dispatch
import openai, collections, time, threading

client       = openai.OpenAI(api_key='YOUR_KEY')
event_buffer = collections.deque(maxlen=200)
# Background thread fills buffer from TAK TCP stream (see Section 3.2.1)

def generate_sitrep():
    events  = list(event_buffer)
    if not events: return 'No events recorded.'
    summary = '\n'.join(
        f"{e['uid']} ({e['type']}) at {e['lat']:.4f},{e['lon']:.4f}"
        for e in events[-50:])
    resp = client.chat.completions.create(
        model='gpt-4o',
        messages=[
            {'role':'system','content':
             'You are a military operations staff officer. '
             'Generate a concise SITREP from TAK event data. '
             'Include friendly forces disposition and notable activities.'},
            {'role':'user','content':summary}])
    return resp.choices[0].message.content

def sitrep_loop():
    while True:
        time.sleep(3600)
        dispatch_geochat('AI-SITREP', generate_sitrep())

threading.Thread(target=sitrep_loop, daemon=True).start()

4.2.3 LLM as Intelligent Alert Router

Python β€” LLM-driven alert routing
def route_alert_with_llm(alert):
    prompt = f'''
    TAK alert: {alert['type']} at {alert['lat']},{alert['lon']}
    Description: {alert['remarks']}
    Nearby units: Alpha-1 (0.3km), Bravo-2 (1.2km)
    External: camera-server, dispatch-cad, supervisors-sms
    Respond as JSON:
    {{"notify_units":[],"external_webhooks":[],"escalate":false}}
    '''
    resp = client.chat.completions.create(
        model='gpt-4o',
        messages=[{'role':'user','content':prompt}],
        response_format={'type':'json_object'})
    execute_routing(json.loads(resp.choices[0].message.content))

4.3 Local / On-Premise LLM (Air-Gapped)

Python β€” Ollama local LLM (drop-in for cloud LLM calls)
# Install: curl -fsSL https://ollama.ai/install.sh | sh
# Pull model: ollama pull llama3.2

import requests

def local_llm(prompt, model='llama3.2'):
    resp = requests.post('http://localhost:11434/api/generate',
        json={'model':model,'prompt':prompt,'stream':False})
    return resp.json()['response']

sitrep = local_llm(f'Generate SITREP from: {event_summary}')
DEPLOYMENT TIP
Docker Compose is ideal for the full field server stack. A single docker-compose.yml brings up FreeTAKServer, Mosquitto MQTT, Ollama, your CV model server, and your middleware. A single docker-compose up starts the entire integrated TAK stack with no internet required. See Section 13.4 for the full Docker Compose example.

4.4 Geofence Alerting with AI Classification

Python β€” Shapely geofence + alert dispatch
from shapely.geometry import Point, Polygon
import requests

ZONES = {'restricted-zone-a': Polygon([
    (-77.05,38.90),(-77.03,38.90),(-77.03,38.89),(-77.05,38.89)])}
known = {}

def check_geofences(uid, lat, lon, cot_type):
    pt = Point(lon, lat)
    for zone, poly in ZONES.items():
        inside = poly.contains(pt)
        if inside and not known.get((uid,zone), False):
            if 'hostile' in cot_type or 'unknown' in cot_type:
                tak.sendall(build_alert_cot(uid,lat,lon,zone).encode())
                requests.post('https://cameras/ptz',json={'zone':zone})
        known[(uid, zone)] = inside
5

KML / KMZ Integration β€” Complete Reference

KML is the fastest path to geographic overlays in TAK without ATAK-specific development. Supported natively by ATAK, WinTAK, WebTAK, and Google Earth.

5.1 Static KML/KMZ Import

  1. Create KML in QGIS, ArcGIS, or any GIS tool that exports KML
  2. For KMZ: ZIP the KML as doc.kml plus referenced images in files/, rename to .kmz
  3. Transfer to device via USB, email, TAK Data Package, or SD card
  4. In ATAK: hamburger menu > Import Manager > Local SD or Downloads
  5. Layer appears instantly; manage visibility in Overlay Manager

5.2 ATAK KML Feature Support

KML FeatureATAK SupportNotes
Placemark (Point)FullCustom icons, labels, descriptions
Placemark (LineString)FullColor, width, opacity
Placemark (Polygon)FullFill color, border, opacity
Folder / DocumentFullNested layer organization
Network LinkFullonInterval and onChange refresh
GroundOverlayPartialRaster only (JPEG/PNG)
ScreenOverlay / 3D ModelNoNot supported
ExtendedDataPartialShown as key-value in info panel

5.3 Live KML Network Link β€” Production Web Service

Python / FastAPI β€” authenticated live KML server endpoint
from fastapi import FastAPI, Depends, HTTPException, Header
from fastapi.responses import Response
import httpx, datetime

app = FastAPI()
VALID_KEYS = {'tak-client-key-abc123'}

async def verify_key(x_api_key: str = Header(...)):
    if x_api_key not in VALID_KEYS:
        raise HTTPException(403, 'Invalid API key')

def make_placemark(site):
    lat, lon  = site['coordinates']['lat'], site['coordinates']['lon']
    color     = 'ff00ff00' if site['status'] == 'online' else 'ff0000ff'
    return f'''
    <Placemark>
      <name>{site['name']}</name>
      <Style><IconStyle><color>{color}</color></IconStyle></Style>
      <Point><coordinates>{lon},{lat},0</coordinates></Point>
    </Placemark>'''

@app.get('/live.kml', dependencies=[Depends(verify_key)])
async def live_kml():
    async with httpx.AsyncClient() as client:
        resp  = await client.get('https://data.example.com/api/sites',
                                headers={'X-Api-Key':'EXT_KEY'})
    sites = resp.json()['sites']
    kml = f'''<?xml version="1.0" encoding="UTF-8"?>
    <kml xmlns="http://www.opengis.net/kml/2.2">
      <Document>
        <name>Live Sites - {datetime.datetime.utcnow():%H:%MZ}</name>
        {''.join(make_placemark(s) for s in sites)}
      </Document>
    </kml>'''
    return Response(content=kml,
        media_type='application/vnd.google-earth.kml+xml')

# Distribute this Network Link KML to ATAK users:
# <NetworkLink><Link>
#   <href>https://yourserver/live.kml</href>
#   <refreshMode>onInterval</refreshMode>
#   <refreshInterval>30</refreshInterval>
# </Link></NetworkLink>
6

ATAK Plugin Development β€” Complete Guide

ATAK plugins are the most powerful integration path. A plugin runs inside the ATAK process with full access to the map, CoT events, contacts, missions, video streams, sensors, and camera.

6.1 Development Environment Setup

  • Install Android Studio (latest stable β€” Hedgehog or newer)
  • Install Android SDK API Level 21 (minimum) through 34 (latest)
  • Install Java 11 or 17 (required; Java 8 is insufficient)
  • Register at tak.gov (free) and download the ATAK SDK
  • Clone the ATAK Plugin Template: github.com/deptofdefense/AndroidTacticalAssaultKit-CIV
  • Copy atak-sdk-X.X.X.jar into your project's app/libs/ directory
Groovy β€” app/build.gradle dependencies
android {
    compileSdk 34
    defaultConfig { minSdk 21; targetSdk 34 }
}
dependencies {
    compileOnly files('libs/atak-sdk-4.8.1.jar')
    implementation 'com.squareup.okhttp3:okhttp:4.12.0'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.3'
}

6.2 Required Plugin Classes

ClassRole
PluginLifecycle.javaEntry point β€” ATAK calls onStart/onStop; register/unregister components here
PluginMapComponent.javaRegisters plugin with ATAK's component system; creates toolbar button
PluginDropDownReceiver.javaManages the plugin's slide-in UI panel (drop-down)
plugin.xmlPlugin manifest: name, version, minimum ATAK version, widget definition
plugin_layout.xmlAndroid XML layout for the drop-down UI panel

6.3 Map API: Pan, Zoom, Add Markers, Listen for Taps

Java β€” ATAK map API reference
MapView mapView = MapView.getMapView();

// Pan and zoom
mapView.getMapController().panTo(new GeoPoint(38.8977, -77.0365), true);
mapView.getMapController().zoomTo(5000.0, true);

// Add a marker
PlacemarkMapItem marker = new PlacemarkMapItem(
    mapView.getContext(),
    new GeoPoint(38.8977, -77.0365),
    UUID.randomUUID().toString());
marker.setTitle("My API Marker");
marker.setType("a-f-G-U-C");
mapView.getRootGroup().addItem(marker);

// Listen for map taps
mapView.getMapEventDispatcher()
       .addMapEventListener(MapEvent.MAP_CLICK, event -> {
    GeoPoint tap = event.getPoint();
    // tap.getLatitude(), tap.getLongitude()
});

// Iterate all map items (full state access)
mapView.getRootGroup().deepForEachItem(item -> {
    String uid      = item.getUID();
    String type     = item.getType();
    GeoPoint pt     = item.getGeoPointMetaData().get();
    String callsign = item.getMetaString("callsign", "Unknown");
    externalServer.reportPosition(uid, callsign, type,
                                  pt.getLatitude(), pt.getLongitude());
    return true;
});

6.5 Geofence Detection in a Plugin

Java β€” ATAK built-in geofence API
GeoFenceComponent gfc = GeoFenceComponent.getInstance(mapView);

GeoFence fence = new GeoFence(
    "zone-alpha",
    new GeoPoint(38.8977, -77.0365),
    500.0,                       // radius in meters
    GeoFence.Trigger.Entry,      // Entry, Exit, or Both
    GeoFence.MonitoredTypes.All);
gfc.addGeoFence(fence);

gfc.addGeoFenceListener((fenceName, itemUID, trigger) -> {
    if (trigger == GeoFence.Trigger.Entry) {
        notifyExternalServer(itemUID, fenceName, "ENTRY");
        dispatchAlertCot(itemUID, fenceName);
    }
});

6.7 Forwarding Plugin Data to External Server

Java β€” subscribe to CoT events and POST to C2 server
CotEventCache.getInstance().addCotEventListener(cot -> {
    if (cot.getType().startsWith("a-h")) {  // hostile contacts
        new Thread(() -> {
            try {
                JSONObject payload = new JSONObject();
                payload.put("uid",  cot.getUID());
                payload.put("type", cot.getType());
                payload.put("lat",  cot.getPoint().getLat());
                payload.put("lon",  cot.getPoint().getLon());
                new OkHttpClient().newCall(
                    new Request.Builder()
                        .url("https://c2.example.com/contacts")
                        .post(RequestBody.create(payload.toString(),
                              MediaType.get("application/json")))
                        .build()).execute();
            } catch (Exception e) { Log.e("Plugin", "Callback failed", e); }
        }).start();
    }
});

6.8 tak.gov Plugin Submission

Pre-Submission Checklist

  • Plugin installs and runs on ATAK-CIV (civilian version β€” this is the version reviewed)
  • Tested on Android API 21 through 34
  • No hard-coded credentials or private keys
  • All network connections use HTTPS (HTTP only allowed to localhost)
  • APK is signed with a release keystore (debug signing is rejected)
  • README documenting what the plugin does and how to configure it
  • If calling external APIs: document what data is transmitted and to where

Submission Process

  1. Go to tak.gov and log in with a free TAK account
  2. Navigate to: Ecosystem > App Store > Submit App
  3. Fill in plugin name, description, target ATAK version, category, screenshots (minimum 2), Privacy Policy URL
  4. Upload your signed release APK
  5. TAK Product Center review typically takes 2–8 weeks
  6. Upon approval, plugin appears in the tak.gov App Store
7

Integrating Data From External Servers

7.1 Push-Only (Inbound to TAK)

OptionDetails
TAK Server Data FeedConfigure TAK Server to poll your HTTPS endpoint β€” no persistent connection needed
CoT TCP ProducerYour server opens a socket to port 8087/8089 and streams CoT XML
UDP MulticastLAN-only deployments; broadcasts to multicast group, no TAK Server needed
KML Network LinkYour server serves a KML endpoint; ATAK polls on a schedule
FreeTAKServer REST POSTPOST CoT XML to FTS REST API β€” simple HTTP, no socket management
MARTI API File UploadPush pre-built overlay files (KML, GeoPackage) to TAK Server for distribution

7.2 Receive-Only (Outbound from TAK) β€” PostgreSQL Logger Example

Python β€” TAK event subscriber β†’ PostgreSQL audit log
import socket, psycopg2, xml.etree.ElementTree as ET

conn = psycopg2.connect(host='db',dbname='taklog',user='tak',password='secret')
cur  = conn.cursor()
cur.execute('''CREATE TABLE IF NOT EXISTS cot_events (
    id SERIAL PRIMARY KEY, uid TEXT, cot_type TEXT,
    lat DOUBLE PRECISION, lon DOUBLE PRECISION,
    callsign TEXT, received TIMESTAMPTZ DEFAULT NOW())''')
conn.commit()

def parse_and_log(xml_str):
    root = ET.fromstring(xml_str)
    pt   = root.find('point')
    if pt is None: return
    detail   = root.find('detail') or ET.Element('d')
    contact  = detail.find('contact')
    callsign = contact.get('callsign','') if contact else ''
    cur.execute('INSERT INTO cot_events '
                '(uid,cot_type,lat,lon,callsign) VALUES (%s,%s,%s,%s,%s)',
        (root.get('uid'), root.get('type'),
         float(pt.get('lat',0)), float(pt.get('lon',0)), callsign))
    conn.commit()

7.3 Full Bidirectional β€” Enterprise Middleware Hub

REAL-WORLD SCENARIO
A camera management system (CMS) on Server A detects a vehicle entering a restricted zone. Server B (AI/CV) classifies the detection. TAK Server creates a hostile marker. The TAK operator dispatches a unit β€” that dispatch flows back from TAK to the CMS, which swings the nearest PTZ camera to track the unit en route. All three servers communicate simultaneously and bidirectionally, with TAK at the hub.
Python β€” full bidirectional middleware hub (asyncio + WebSocket)
import asyncio, websockets, socket, json, xml.etree.ElementTree as ET

TAK_HOST = 'takserver.local'
TAK_PORT = 8087
WS_PORT  = 8765

tak_out  = socket.create_connection((TAK_HOST, TAK_PORT))
CLIENTS  = {}

async def handle_client(websocket, path):
    hello     = json.loads(await websocket.recv())
    server_id = hello['server_id']
    CLIENTS[server_id] = {'ws': websocket,
                          'caps': hello.get('capabilities', ['send'])}
    async for message in websocket:
        data = json.loads(message)
        if data.get('type') == 'cot':
            tak_out.sendall(data['xml'].encode())

async def tak_listener():
    buf = b''
    with socket.create_connection((TAK_HOST, TAK_PORT)) as listener:
        while True:
            data = listener.recv(4096)
            if not data: break
            buf += data
            while b'</event>' in buf:
                end = buf.index(b'</event>') + 8
                xml_str = buf[:end].decode('utf-8','replace')
                buf     = buf[end:]
                payload = json.dumps({'type':'cot_event','xml':xml_str})
                for srv in list(CLIENTS.values()):
                    if 'receive' in srv['caps']:
                        try: await srv['ws'].send(payload)
                        except: pass

async def main():
    server = await websockets.serve(handle_client, '0.0.0.0', WS_PORT)
    await asyncio.gather(server.wait_closed(), tak_listener())

asyncio.run(main())
8

Video, Sensor, and Hardware Integration

8.1 RTSP Video Feeds

XML β€” video connection file for ATAK Data Package
<?xml version="1.0" encoding="UTF-8"?>
<videoConnections>
  <VideoConnection>
    <uid>cam-gate-north</uid>
    <alias>Gate North β€” PTZ</alias>
    <address>rtsp://192.168.10.50:554/axis-media/media.amp</address>
    <port>554</port>
    <protocol>rtsp</protocol>
    <username>admin</username>
    <password>your_password</password>
    <latitude>38.8977</latitude>
    <longitude>-77.0365</longitude>
  </VideoConnection>
</videoConnections>

8.2 TAK Video Server (TVS) Setup

TVS is an open-source RTSP-to-RTMP bridge. Cameras stream once to TVS, which re-streams to any number of ATAK clients β€” preventing camera overload. Install from github.com/FreeTAKTeam/FreeTAKServer-Video-server. Configure sources in config.py mapping each camera RTSP URL to a TVS stream name. TVS exposes each camera as RTSP (for ATAK) and RTMP (for web players). Configure lat/lon per stream so ATAK shows the camera icon on the map.

8.3 Drone Video Integration

  • Drone outputs RTMP to an onboard or ground station encoder
  • FFmpeg converts to RTSP: ffmpeg -i rtmp://localhost:1935/drone1 -c copy -f rtsp rtsp://tvs-server:8554/drone1
  • TVS re-streams to all ATAK clients
  • A CoT producer tracks drone telemetry (from MAVLink) and places a moving icon on the map
  • ATAK operators see the drone moving on the map and tap it to open the live video

8.4 Hardware Device Integration

8.4.1 External GPS Receivers (gpsd β†’ CoT)

Python β€” gpsd NMEA reader β†’ CoT bridge
import gps as gpslib, socket, datetime

tak     = socket.create_connection(('takserver', 8087))
session = gpslib.gps(mode=gpslib.WATCH_ENABLE | gpslib.WATCH_NEWSTYLE)

for report in session:
    if report.get('class') == 'TPV':
        lat = getattr(report, 'lat', None)
        lon = getattr(report, 'lon', None)
        if lat and lon:
            now   = datetime.datetime.utcnow()
            stale = now + datetime.timedelta(minutes=2)
            fmt   = '%Y-%m-%dT%H:%M:%S.000Z'
            cot = f'''<?xml version="1.0"?>
            <event version="2.0" uid="gps-001"
                   type="a-f-G-U-C" how="m-g"
                   time="{now.strftime(fmt)}"
                   start="{now.strftime(fmt)}"
                   stale="{stale.strftime(fmt)}">
              <point lat="{lat}" lon="{lon}"
                     hae="{getattr(report,'alt',0)}"
                     ce="3" le="5"/>
              <detail>
                <contact callsign="GPS-UNIT-1"/>
                <track speed="{getattr(report,'speed',0)}"
                       course="{getattr(report,'track',0)}"/>
              </detail>
            </event>'''
            tak.sendall(cot.encode())

8.4.2 AIS Marine Vessel Tracking (pyais β†’ CoT)

Python β€” pyais serial reader β†’ CoT bridge
from pyais.stream import SerialReaderNAIS
import socket, datetime

tak = socket.create_connection(('takserver', 8087))
AIS_TYPE_MAP = {'Tanker':'a-n-S-S-G','Cargo':'a-n-S-S-C','HSC':'a-f-S-S'}

for msg in SerialReaderNAIS('/dev/ttyUSB1'):
    try:
        d = msg.decode()
        if not hasattr(d,'lat') or not d.lat: continue
        cot_type = AIS_TYPE_MAP.get(str(d.shiptype),'a-u-S-S')
        name     = getattr(d,'shipname',f'MMSI-{d.mmsi}').strip()
        now   = datetime.datetime.utcnow()
        stale = now + datetime.timedelta(minutes=5)
        fmt   = '%Y-%m-%dT%H:%M:%S.000Z'
        cot   = f'''<?xml version="1.0"?>
        <event version="2.0" uid="ais-{d.mmsi}" type="{cot_type}"
               how="m-g" time="{now.strftime(fmt)}"
               start="{now.strftime(fmt)}" stale="{stale.strftime(fmt)}">
          <point lat="{d.lat}" lon="{d.lon}" hae="0" ce="50" le="50"/>
          <detail>
            <contact callsign="{name}"/>
            <remarks>MMSI:{d.mmsi}</remarks>
          </detail>
        </event>'''
        tak.sendall(cot.encode())
    except Exception as e: print(f'AIS error: {e}')

8.4.3 APRS Amateur Radio β†’ CoT

Python β€” APRS-IS stream β†’ CoT bridge
import socket, aprslib, datetime

tak = socket.create_connection(('takserver', 8087))

def aprs_to_cot(packet):
    if 'latitude' not in packet or 'longitude' not in packet: return
    now   = datetime.datetime.utcnow()
    stale = now + datetime.timedelta(minutes=10)
    fmt   = '%Y-%m-%dT%H:%M:%S.000Z'
    cot = f'''<?xml version="1.0"?>
    <event version="2.0" uid="aprs-{packet['from']}" type="a-f-G"
           how="m-g" time="{now.strftime(fmt)}"
           start="{now.strftime(fmt)}" stale="{stale.strftime(fmt)}">
      <point lat="{packet['latitude']}" lon="{packet['longitude']}"
             hae="0" ce="100" le="100"/>
      <detail>
        <contact callsign="{packet['from']}"/>
        <remarks>{packet.get('comment','APRS')}</remarks>
      </detail>
    </event>'''
    tak.sendall(cot.encode())

AIS = aprslib.IS('N0CALL', passwd='-1',
                  host='rotate.aprs2.net', port=14580)
AIS.set_filter('r/38.8977/-77.0365/50')
AIS.connect()
AIS.consumer(aprs_to_cot, raw=False, immortal=True)

8.4.4 FLIR / Thermal Camera Integration

FLIR cameras output RTSP or ONVIF-compatible streams. Connect identically to standard IP cameras (see Section 8.1). For analytics, subscribe to the FLIR alert API or ONVIF event service and dispatch CoT type b-m-p-s-m (sensor marker) at the camera's fixed location when a thermal anomaly is detected. Include temperature data and confidence in the CoT remarks field.

9

TAK Server MARTI API and Data Packages

9.1 Complete MARTI API Reference

EndpointMethod & Purpose
/Marti/api/clientsGET β€” list all connected TAK clients with last-known position
/Marti/api/cot/xmlGET β€” retrieve latest CoT for all known entities as XML
/Marti/api/cot/xml/{uid}GET β€” latest CoT for a specific UID
/Marti/api/cot/xmlPOST β€” inject a CoT event without a persistent socket
/Marti/sync/uploadPOST β€” upload a file (KML, GeoPackage, ZIP, image)
/Marti/sync/searchGET?keywords=X β€” search uploaded files by keyword
/Marti/sync/content?hash=XGET β€” download a specific file by SHA-256 hash
/Marti/missions/{name}POST/GET β€” create or retrieve a named mission
/Marti/missions/{name}/contentsPUT β€” add a file or CoT to a mission
/Marti/missions/{name}/subscriptionPUT β€” subscribe a client callsign to a mission
/Marti/groupsGET β€” list all configured TAK groups
/Marti/api/contacts/allGET β€” all known contacts and their attributes
/Marti/RepeaterBroadcastPOST β€” send a file to all connected clients immediately

9.2 Programmatic Data Package Creation

Python β€” build and upload a Data Package to TAK Server
import zipfile, io, hashlib, requests

SERVER = 'https://takserver:8443'
CERT   = ('client.pem', 'client.key')
CA     = 'ta-ca.pem'

def build_data_package(name, files):
    """files: {zip_entry_path: bytes_or_string}"""
    buf      = io.BytesIO()
    contents = []
    with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
        for zip_path, content in files.items():
            if isinstance(content, str): content = content.encode('utf-8')
            zf.writestr(zip_path, content)
            contents.append(
                f'<Content ignore="false" zipEntry="{zip_path}"/>')
        uid      = f'pkg-{hashlib.md5(name.encode()).hexdigest()[:8]}'
        manifest = f'''<?xml version="1.0" encoding="UTF-8"?>
        <MissionPackageManifest version="2">
          <Configuration>
            <Parameter name="uid" value="{uid}"/>
            <Parameter name="name" value="{name}"/>
          </Configuration>
          <Contents>{"".join(contents)}</Contents>
        </MissionPackageManifest>'''
        zf.writestr('MANIFEST/manifest.xml', manifest)
    return buf.getvalue()

def push_data_package(name, pkg_bytes):
    resp = requests.post(f'{SERVER}/Marti/sync/upload',
        files={'assetfile': (f'{name}.zip', pkg_bytes,
                             'application/x-zip-compressed')},
        params={'name': name, 'keywords': name},
        cert=CERT, verify=CA)
    return resp.json().get('Hash')
10

Map Tiles, Offline Imagery, and GIS Integration

10.1 GeoPackage β€” ATAK Preferred Offline Format

Shell β€” GDAL: GeoTIFF to GeoPackage tile pyramid
# Convert GeoTIFF to GeoPackage
gdal_translate -of GPKG \
    -co TILING_SCHEME=GoogleMapsCompatible \
    -co ZOOM_LEVEL_STRATEGY=UPPER \
    input_imagery.tif output_map.gpkg

# Add vector features to the same GeoPackage
ogr2ogr -f GPKG -append output_map.gpkg input_features.shp

# Push to TAK Server for device download
import requests
requests.post('https://takserver:8443/Marti/sync/upload',
    files={'assetfile':open('output_map.gpkg','rb')},
    cert=('client.pem','client.key'), verify='ta-ca.pem')

10.2 MBTiles

Shell β€” gdal2mbtiles conversion
pip install gdal2mbtiles
gdal2mbtiles input.tif output.mbtiles \
    --min-resolution=8 --max-resolution=17 \
    --resampling=lanczos

10.3 WMS / WMTS Live Tile Server

ATAK connects to OGC WMS/WMTS endpoints for live basemaps. Go to Settings > Maps > Manage Map Sources > + Add and enter the GetCapabilities URL.

TIP
Martin (github.com/maplibre/martin) serves MBTiles and PostGIS vector tiles at very high performance with minimal setup and is the recommended self-hosted option.

10.4 ArcGIS Feature Service β†’ CoT Bridge

Python β€” ArcGIS Feature Service β†’ CoT polling bridge
import requests, socket, datetime, time

AGOL_URL = ('https://services.arcgis.com/YOUR_ORG/arcgis/rest'
            '/services/ActiveIncidents/FeatureServer/0/query')
TOKEN = 'YOUR_ARCGIS_TOKEN'
tak   = socket.create_connection(('takserver', 8087))

def fetch_features():
    return requests.get(AGOL_URL, params={
        'f': 'geojson', 'where': "Status='Active'",
        'outFields': 'IncidentName,Type,Severity,OBJECTID',
        'returnGeometry': 'true', 'token': TOKEN
    }).json()['features']

def feature_to_cot(feat):
    props = feat['properties']
    geom  = feat['geometry']
    lon, lat = geom['coordinates']
    cot_type = 'a-h-G' if props.get('Severity') == 'High' else 'a-u-G'
    now   = datetime.datetime.utcnow()
    stale = now + datetime.timedelta(hours=1)
    fmt   = '%Y-%m-%dT%H:%M:%S.000Z'
    return f'''<?xml version="1.0"?>
    <event version="2.0" uid="esri-{props['OBJECTID']}"
           type="{cot_type}" how="m-g"
           time="{now.strftime(fmt)}" start="{now.strftime(fmt)}"
           stale="{stale.strftime(fmt)}">
      <point lat="{lat:.6f}" lon="{lon:.6f}" hae="0" ce="100" le="100"/>
      <detail><contact callsign="{props['IncidentName']}"/></detail>
    </event>'''

while True:
    for feat in fetch_features():
        tak.sendall(feature_to_cot(feat).encode())
    time.sleep(60)

10.5 PostGIS LISTEN/NOTIFY β†’ CoT (Sub-Second Updates)

Python β€” PostgreSQL LISTEN/NOTIFY β†’ CoT bridge
import psycopg2, select, json, socket, datetime

db = psycopg2.connect('postgresql://gisuser:pass@pgserver/gisdb')
db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = db.cursor()
cur.execute('LISTEN asset_changed;')
tak = socket.create_connection(('takserver', 8087))

while True:
    if select.select([db], [], [], 5) == ([], [], []): continue
    db.poll()
    while db.notifies:
        row   = json.loads(db.notifies.pop(0).payload)
        now   = datetime.datetime.utcnow()
        stale = now + datetime.timedelta(minutes=5)
        fmt   = '%Y-%m-%dT%H:%M:%S.000Z'
        cot = f'''<?xml version="1.0"?>
        <event version="2.0" uid="asset-{row['id']}"
               type="a-f-G-E-V" how="m-g"
               time="{now.strftime(fmt)}" start="{now.strftime(fmt)}"
               stale="{stale.strftime(fmt)}">
          <point lat="{row['lat']:.6f}" lon="{row['lon']:.6f}"
                 hae="0" ce="5" le="5"/>
          <detail><contact callsign="{row['name']}"/></detail>
        </event>'''
        tak.sendall(cot.encode())

# Requires PostgreSQL trigger:
# AFTER INSERT OR UPDATE ON assets
# EXECUTE FUNCTION notify_asset_change();
11

Additional Integration Methods

11.1 MQTT β€” IoT Sensor Networks

Python β€” MQTT bidirectional bridge (IoT ↔ TAK)
import paho.mqtt.client as mqtt
import socket, json, datetime

tak = socket.create_connection(('takserver', 8087))

def on_connect(client, ud, flags, rc):
    client.subscribe('sensors/+/position')
    client.subscribe('alerts/+/trigger')

def on_message(client, ud, msg):
    payload = json.loads(msg.payload.decode())
    if '/position' in msg.topic:
        sensor_id = msg.topic.split('/')[1]
        cot = sensor_to_cot(sensor_id, payload)
        if cot: tak.sendall(cot.encode())
    elif '/trigger' in msg.topic:
        tak.sendall(build_alert_cot(payload).encode())

c = mqtt.Client()
c.on_connect = on_connect
c.on_message = on_message
c.connect('mqtt-broker.example.com', 1883)
c.loop_forever()

11.2 Node-RED Visual Integration

Node-RED provides a no-code path to TAK integration. With community TAK nodes, drag-and-drop flows connect any API, IoT device, or data source to TAK without writing socket code. Install: sudo npm install -g --unsafe-perm node-red then npm install node-red-contrib-tak.

11.3 WinTAK Plugin Development (C#/.NET)

C# β€” WinTAK plugin entry point
[PluginAttribute(Name="My Integration Plugin", Version="1.0.0")]
public class MyPlugin : AbstractPlugin {
    private Timer _pollTimer;

    public override void Initialize() {
        EventBus.Subscribe<CoTEvent>(OnCoTReceived);
        _pollTimer = new Timer(FetchAndDispatch, null,
                               TimeSpan.Zero,
                               TimeSpan.FromSeconds(30));
    }

    private async void FetchAndDispatch(object state) {
        var resp   = await new HttpClient()
                         .GetStringAsync("https://api.example.com/assets");
        var assets = JsonConvert.DeserializeObject<List<Asset>>(resp);
        foreach (var a in assets)
            EventBus.Publish(new SendCoTEvent { CoT = BuildCoT(a) });
    }

    private void OnCoTReceived(CoTEvent evt) {
        ExternalServer.NotifyAsync(evt.UID, evt.Lat, evt.Lon);
    }
}

11.4 CAD / Dispatch System Integration

Inbound: CAD β†’ TAK

Python β€” CAD webhook receiver β†’ CoT dispatch
from flask import Flask, request
import socket, datetime

app = Flask(__name__)
tak = socket.create_connection(('takserver', 8087))

@app.route('/cad/webhook', methods=['POST'])
def cad_incident():
    inc  = request.json
    pri  = inc.get('Priority', 3)
    now  = datetime.datetime.utcnow()
    stale = now + datetime.timedelta(hours=4)
    fmt  = '%Y-%m-%dT%H:%M:%S.000Z'
    cot  = f'''<?xml version="1.0"?>
    <event version="2.0" uid="cad-{inc['IncidentNumber']}"
           type="{'a-h-G' if pri == 1 else 'a-u-G'}" how="h-e"
           time="{now.strftime(fmt)}" start="{now.strftime(fmt)}"
           stale="{stale.strftime(fmt)}">
      <point lat="{inc['Latitude']}" lon="{inc['Longitude']}"
             hae="0" ce="30" le="30"/>
      <detail>
        <contact callsign="CAD-{inc['IncidentNumber']}"/>
        <remarks>P{pri} {inc['IncidentType']}</remarks>
      </detail>
    </event>'''
    tak.sendall(cot.encode())
    return {'status': 'ok'}, 200

Outbound: TAK β†’ CAD

Python β€” TAK unit status β†’ CAD REST API update
import requests, xml.etree.ElementTree as ET

CAD_API = 'https://cad.example.com/api/v2'
CAD_KEY = 'YOUR_CAD_KEY'

def handle_unit_cot(xml_str):
    root   = ET.fromstring(xml_str)
    uid    = root.get('uid')
    detail = root.find('detail') or ET.Element('detail')
    status = detail.find('status')
    if status is not None:
        val = status.get('value', '')
        if val in ('arrived', 'available', 'en_route', 'busy'):
            requests.post(f'{CAD_API}/units/{uid}/status',
                json={'status': val, 'timestamp': root.get('time')},
                headers={'X-Api-Key': CAD_KEY})

11.5 TAK GeoChat (Programmatic Messages)

Python β€” send automated GeoChat message (b-t-f CoT type)
import socket, datetime

tak = socket.create_connection(('takserver', 8087))

def send_geochat(sender_uid, callsign, message, group='__ANON__'):
    now    = datetime.datetime.utcnow()
    stale  = now + datetime.timedelta(hours=1)
    fmt    = '%Y-%m-%dT%H:%M:%S.000Z'
    msg_id = f'GeoChat.{sender_uid}.{int(now.timestamp())}'
    cot = f'''<?xml version="1.0"?>
    <event version="2.0" uid="{msg_id}" type="b-t-f" how="h-g-i"
           time="{now.strftime(fmt)}" start="{now.strftime(fmt)}"
           stale="{stale.strftime(fmt)}">
      <point lat="0" lon="0" hae="0" ce="9999999" le="9999999"/>
      <detail>
        <__chat parent="RootContactGroup"
                chatroom="{group}" id="{group}">
          <chatgrp uid0="{sender_uid}" id="{group}"/>
        </__chat>
        <remarks>{message}</remarks>
      </detail>
    </event>'''
    tak.sendall(cot.encode())

send_geochat('system-bot', 'AI-ALERT',
             'Unknown vehicle detected at Gate North.')

11.7 Webhook Relay β€” TAK as Event Source

Python β€” TAK events β†’ Slack / PagerDuty webhooks
import requests

WEBHOOKS = {
    'hostile_detected': [
        'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK',
        'https://events.pagerduty.com/v2/enqueue',
    ],
}

def fire_webhooks(event_type, payload):
    for url in WEBHOOKS.get(event_type, []):
        try:
            if 'slack.com' in url:
                requests.post(url, json={
                    'text': f'*TAK ALERT*: {event_type}\n{payload}'})
            elif 'pagerduty' in url:
                requests.post(url, json={
                    'routing_key': 'YOUR_PD_KEY',
                    'event_action': 'trigger',
                    'payload': {'summary': payload,
                                'severity': 'critical'}})
        except Exception as e: print(f'Webhook failed: {e}')

11.8 pyTAK β€” Recommended High-Level Python Library

Python β€” pyTAK async CoT producer (recommended starting point)
pip install pytak

import asyncio, pytak
from configparser import ConfigParser

class MyWorker(pytak.QueueWorker):
    async def handle_data(self, data):
        event = pytak.cot_event(
            uid=data['uid'], lat=data['lat'],
            lon=data['lon'], cot_type='a-f-G-E-V',
            callsign=data['name'])
        await self.put_queue(event)

    async def run(self):
        while True:
            for asset in await fetch_assets():
                await self.handle_data(asset)
            await asyncio.sleep(15)

async def main():
    config = ConfigParser()
    config['pytak'] = {'COT_URL': 'tcp://takserver:8087'}
    clitool = pytak.CLITool(config)
    await clitool.setup()
    clitool.add_tasks({MyWorker(clitool.tx_queue, config)})
    await clitool.run()

asyncio.run(main())
12

Security Architecture and Best Practices

ZERO TRUST
Treat every integration point as potentially compromised. Each service gets its own certificate. No shared credentials between services. A compromised middleware service should not allow lateral movement to other TAK clients.

12.1 Certificate Management

  • TAK Server is its own CA. Every connecting client must use a certificate issued by this CA.
  • In TAK Server admin: Certificate Enrollment > Create a client certificate for each integration
  • For Python: openssl pkcs12 -in client.p12 -out client.pem -nodes
  • For Java/Android: load the .p12 directly into a KeyStore
  • Revoke certificates immediately when a service is decommissioned
  • Never share certificates between services β€” one per integration

12.2 API Key Management

EnvironmentRecommended Approach
DevelopmentEnvironment variables (.env file, never committed to git)
Docker/ContainerDocker secrets or environment injection from CI/CD
Android ATAK PluginAndroid Keystore β€” keys never leave the secure hardware enclave
Production ServerHashiCorp Vault, AWS Secrets Manager, or Azure Key Vault
Field DeploymentEncrypted config file with key derived from deployment passphrase

12.4 CoT Input Validation

Python β€” sanitize all external input before CoT construction
import re, html

def validate_cot_fields(uid, lat, lon, callsign, remarks):
    if not re.match(r'^[a-zA-Z0-9._-]{1,255}$', uid):
        raise ValueError(f'Invalid UID: {uid}')
    if not (-90 <= float(lat) <= 90):
        raise ValueError(f'Latitude out of range: {lat}')
    if not (-180 <= float(lon) <= 180):
        raise ValueError(f'Longitude out of range: {lon}')
    callsign = html.escape(str(callsign))[:64]
    remarks  = html.escape(str(remarks))[:500]
    return uid, float(lat), float(lon), callsign, remarks

12.5 Port Reference

PortPurposeSecurity Requirement
8087 TCPCoT cleartext inboundDevelopment/LAN only β€” disable in production
8089 TCP (TLS)CoT TLS inboundREQUIRED for production; use mTLS with client certs
8443 TCP (HTTPS)MARTI REST API and admin consoleRestrict access to admin IPs only
9000 TCP (TLS)TAK Server federationBidirectional server-to-server; requires cert exchange
6969 UDPCoT multicastLAN/field only; not routable; no authentication
554 TCPRTSP videoUse VPN or point-to-point; plaintext by default
1935 TCPRTMP videoUse RTMPS (443) or VPN in production
8554 TCPTAK Video Server RTSP relayInternal only; never expose externally
1883 TCPMQTT plaintextDevelopment only; use TLS on port 8883
8883 TCP (TLS)MQTT over TLSRequired for production IoT integrations
11434 TCPOllama local LLM APIInternal only; never expose externally
13

Enterprise Patterns and Reference Architectures

13.1 TAK Integration Hub Pattern

Python β€” integration hub with pluggable adapters
import socket, threading, time, logging

class TakIntegrationHub:
    def __init__(self, tak_host, tak_port):
        self.tak_addr     = (tak_host, tak_port)
        self.tak_conn     = socket.create_connection(self.tak_addr)
        self.integrations = []
        self._start_listener()

    def register(self, integration):
        """Integration must implement get_cot_events() and on_tak_event()"""
        self.integrations.append(integration)
        threading.Thread(target=self._run_integration,
                         args=(integration,), daemon=True).start()

    def _run_integration(self, integration):
        while True:
            try:
                for cot_xml in integration.get_cot_events():
                    self.tak_conn.sendall(cot_xml.encode())
            except Exception as e:
                logging.error(f'{integration}: {e}')
                time.sleep(5)

    def _start_listener(self):
        def listen():
            with socket.create_connection(self.tak_addr) as s:
                buf = b''
                while True:
                    data = s.recv(4096)
                    if not data: break
                    buf += data
                    while b'</event>' in buf:
                        end = buf.index(b'</event>') + 8
                        xml = buf[:end].decode('utf-8','replace')
                        buf = buf[end:]
                        for integ in self.integrations:
                            try: integ.on_tak_event(xml)
                            except: pass
        threading.Thread(target=listen, daemon=True).start()

hub = TakIntegrationHub('takserver', 8087)
hub.register(FleetApiIntegration())
hub.register(CameraSystemIntegration())
hub.register(WeatherFeedIntegration())

13.2 High Availability Design

  • Run two or more middleware instances with a load balancer in front
  • Use a message queue (RabbitMQ, Redis Streams) between data fetchers and CoT dispatchers β€” prevents data loss during TAK Server restarts
  • Implement dead-letter queues for CoT events that fail after N retries
  • Health check endpoints (/health returning 200 OK) for monitoring via Prometheus, Datadog, or Nagios
  • Auto-restart via systemd service unit or Docker restart: always

13.3 Multi-Agency Federation Options

ApproachBest For
TAK Server FederationBidirectional sync; each server controls which data it shares via group policies
Federated Data FeedOne agency serves a CoT endpoint that partner servers poll β€” unidirectional, simpler security
Shared MARTI MissionAll agencies subscribe to a named mission on a neutral server β€” good for exercises
KMZ Network Link ExchangeEach agency publishes a KML endpoint; partners subscribe β€” lowest bar for interoperability
Message Bus FederationEach agency publishes to a shared Kafka/MQTT topic; others subscribe to relevant topics

13.4 Containerized TAK Stack (Docker Compose)

YAML β€” docker-compose.yml: complete TAK integration stack
version: '3.8'
services:
  freetek:
    image: freetakteam/freetakserver:latest
    ports:
      - "8087:8087"     # CoT TCP
      - "19023:19023"   # REST API
    volumes: [freetek-data:/opt/FreeTAKServer]

  mqtt:
    image: eclipse-mosquitto:latest
    ports: ["1883:1883"]
    volumes: ["./mosquitto.conf:/mosquitto/config/mosquitto.conf"]

  ollama:
    image: ollama/ollama:latest
    ports: ["11434:11434"]
    volumes: [ollama-models:/root/.ollama]

  tile-server:
    image: ghcr.io/maplibre/martin:latest
    ports: ["3000:3000"]
    environment:
      - DATABASE_URL=postgresql://gis:pass@postgis/gisdb

  postgis:
    image: postgis/postgis:15-3.4
    environment:
      POSTGRES_DB: gisdb
      POSTGRES_USER: gis
      POSTGRES_PASSWORD: pass
    volumes: [postgis-data:/var/lib/postgresql/data]

  middleware:
    build: ./middleware
    environment:
      - TAK_HOST=freetek
      - TAK_PORT=8087
      - MQTT_HOST=mqtt
      - OLLAMA_URL=http://ollama:11434
    depends_on: [freetek, mqtt, ollama]

volumes:
  freetek-data:
  ollama-models:
  postgis-data:
14

Quick Reference and Resource Library

14.2 Essential Libraries and Tools

Tool / LibraryLanguagePurposeSource
pyTAKPythonHigh-level async CoT producer/consumer β€” recommended starting pointgithub.com/snstac/pytak
ATAK SDKJava/AndroidOfficial API for ATAK plugin developmenttak.gov/products
WinTAK SDKC#/.NETOfficial API for WinTAK plugin developmenttak.gov/products
FreeTAKServerPythonOpen-source TAK Server with REST API; ideal for dev/testgithub.com/FreeTAKTeam
TAK Video ServerPythonRTSP/RTMP relay for TAK videogithub.com/FreeTAKTeam/TVS
pyaisPythonAIS maritime vessel data parserpip install pyais
paho-mqttPythonMQTT client for IoT-to-CoT bridgespip install paho-mqtt
aprslibPythonAPRS amateur radio position data parserpip install aprslib
shapelyPythonGeometric operations for geofence detectionpip install shapely
GDAL/OGRCLI/PythonSwiss-army knife for geospatial format conversiongdal.org
QGISDesktop GISCreate KML, GeoPackage, MBTiles overlaysqgis.org
MartinRustHigh-performance MBTiles/PostGIS tile servergithub.com/maplibre/martin
OllamaLocal AIRun open-source LLMs locally for air-gapped environmentsollama.ai
ultralytics (YOLO)PythonComputer vision model framework for camera analyticspip install ultralytics
FFmpegCLIVideo stream transcoding and RTSP bridgingffmpeg.org
node-red-contrib-takNode-REDNo-code TAK integration flowsnpm install

14.3 CoT Type Quick Reference

CoT TypeMeaningUse Case
a-f-G-U-CFriendly Ground Unit CombatPersonnel, soldiers, law enforcement
a-f-G-E-VFriendly Ground Equipment VehicleVehicles, trucks, equipment
a-f-A-C-FFriendly Fixed-Wing AircraftPlanes, UAVs
a-f-A-M-HFriendly Rotary-WingHelicopters, quadrotors
a-f-S-SFriendly Sea SurfaceShips, patrol boats
a-h-GHostile GroundDetected threats, AI CV detections
a-u-GUnknown GroundUnidentified entities, camera detections
a-n-GNeutral GroundCivilian, non-threatening
b-m-p-s-mSensor MarkerIoT sensor placement, weather station
b-t-fGeoChat MessageTeam text communication
u-d-rDrawing RectangleZone boundaries, geofences
u-d-c-cDrawing CircleRadius rings, circular geofences
a-u-G-E-WWeather EventStorm cells, fire positions
t-b-aTasking Broadcast AlertEmergency alerts to all units

14.4 Key Resources