长兴县聚强科技

教你使用zabbix api批量添加数百台监控主机的方法

2026-03-26 21:21:01 浏览次数:0
详细信息

一、准备工作

1. 获取API认证token

import json
import requests

# Zabbix服务器信息
zabbix_url = "http://your-zabbix-server/zabbix/api_jsonrpc.php"
username = "Admin"
password = "zabbix"

# 获取认证token
def get_auth_token():
    headers = {"Content-Type": "application/json-rpc"}

    data = {
        "jsonrpc": "2.0",
        "method": "user.login",
        "params": {
            "username": username,
            "password": password
        },
        "id": 1,
        "auth": None
    }

    response = requests.post(zabbix_url, headers=headers, data=json.dumps(data))
    return response.json()["result"]

二、批量添加主机的方法

1. 基础添加方法

def create_hosts_batch(hosts_list, auth_token, group_ids, template_ids):
    """批量创建主机"""
    headers = {"Content-Type": "application/json-rpc"}

    # 分批处理,每批50个主机(避免请求过大)
    batch_size = 50
    created_hosts = []

    for i in range(0, len(hosts_list), batch_size):
        batch = hosts_list[i:i+batch_size]
        host_data = []

        for host in batch:
            host_config = {
                "host": host["hostname"],
                "interfaces": [
                    {
                        "type": 1,  # Agent接口
                        "main": 1,
                        "useip": 1,
                        "ip": host["ip"],
                        "dns": "",
                        "port": "10050"
                    }
                ],
                "groups": [{"groupid": gid} for gid in group_ids],
                "templates": [{"templateid": tid} for tid in template_ids],
                "inventory_mode": 0,
                "inventory": host.get("inventory", {}),
                "tags": host.get("tags", []),
                "macros": host.get("macros", [])
            }
            host_data.append(host_config)

        data = {
            "jsonrpc": "2.0",
            "method": "host.create",
            "params": host_data,
            "auth": auth_token,
            "id": i+1
        }

        try:
            response = requests.post(zabbix_url, headers=headers, data=json.dumps(data, indent=2))
            result = response.json()

            if "result" in result:
                created_hosts.extend(result["result"]["hostids"])
                print(f"成功创建 {len(result['result']['hostids'])} 台主机")
            else:
                print(f"批量创建出错: {result.get('error', {})}")
        except Exception as e:
            print(f"请求失败: {e}")

    return created_hosts

2. 从CSV文件批量导入

import csv

def import_hosts_from_csv(csv_file, auth_token):
    """从CSV文件导入主机"""
    hosts = []

    with open(csv_file, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            host = {
                "hostname": row["hostname"],
                "ip": row["ip"],
                "groups": [{"groupid": row["groupid"]}] if "groupid" in row else [],
                "templates": [{"templateid": row["templateid"]}] if "templateid" in row else [],
                "inventory": {
                    "location": row.get("location", ""),
                    "site": row.get("site", ""),
                    "tag": row.get("tag", "")
                }
            }
            hosts.append(host)

    return hosts

3. 完整的批量添加脚本

#!/usr/bin/env python3
"""
Zabbix批量添加主机脚本
"""

import json
import requests
import csv
import time
from typing import List, Dict

class ZabbixBatchManager:
    def __init__(self, url, username, password):
        self.url = url
        self.auth_token = self.login(username, password)

    def login(self, username, password):
        """登录Zabbix获取认证token"""
        data = {
            "jsonrpc": "2.0",
            "method": "user.login",
            "params": {
                "username": username,
                "password": password
            },
            "id": 1,
            "auth": None
        }

        response = requests.post(self.url, json=data, timeout=30)
        result = response.json()

        if "result" in result:
            print("登录成功")
            return result["result"]
        else:
            raise Exception(f"登录失败: {result.get('error', {})}")

    def get_group_id(self, group_name):
        """获取主机组ID"""
        data = {
            "jsonrpc": "2.0",
            "method": "hostgroup.get",
            "params": {
                "output": ["groupid"],
                "filter": {"name": group_name}
            },
            "auth": self.auth_token,
            "id": 2
        }

        response = requests.post(self.url, json=data)
        result = response.json()

        if result.get("result"):
            return result["result"][0]["groupid"]
        return None

    def get_template_id(self, template_name):
        """获取模板ID"""
        data = {
            "jsonrpc": "2.0",
            "method": "template.get",
            "params": {
                "output": ["templateid"],
                "filter": {"host": template_name}
            },
            "auth": self.auth_token,
            "id": 3
        }

        response = requests.post(self.url, json=data)
        result = response.json()

        if result.get("result"):
            return result["result"][0]["templateid"]
        return None

    def add_hosts_batch(self, hosts_data: List[Dict], batch_size=100):
        """批量添加主机"""
        added_hosts = []
        failed_hosts = []

        for i in range(0, len(hosts_data), batch_size):
            batch = hosts_data[i:i+batch_size]
            print(f"正在处理第 {i//batch_size + 1} 批,共 {len(batch)} 台主机")

            params = []
            for host in batch:
                host_config = {
                    "host": host["hostname"],
                    "interfaces": [
                        {
                            "type": 1,
                            "main": 1,
                            "useip": 1,
                            "ip": host["ip"],
                            "dns": "",
                            "port": host.get("port", "10050")
                        }
                    ],
                    "groups": [{"groupid": gid} for gid in host.get("group_ids", [])],
                    "templates": [{"templateid": tid} for tid in host.get("template_ids", [])],
                    "macros": host.get("macros", []),
                    "tags": host.get("tags", []),
                    "description": host.get("description", "")
                }

                # 添加SNMP接口(可选)
                if host.get("snmp_ip"):
                    host_config["interfaces"].append({
                        "type": 2,  # SNMP接口
                        "main": 0,
                        "useip": 1,
                        "ip": host["snmp_ip"],
                        "dns": "",
                        "port": host.get("snmp_port", "161"),
                        "details": {
                            "version": host.get("snmp_version", 2),
                            "community": host.get("snmp_community", "{$SNMP_COMMUNITY}")
                        }
                    })

                params.append(host_config)

            data = {
                "jsonrpc": "2.0",
                "method": "host.create",
                "params": params,
                "auth": self.auth_token,
                "id": i+100
            }

            try:
                response = requests.post(self.url, json=data, timeout=60)
                result = response.json()

                if "result" in result:
                    added_hosts.extend(result["result"]["hostids"])
                    print(f"✓ 成功添加 {len(result['result']['hostids'])} 台主机")
                else:
                    print(f"✗ 添加失败: {result.get('error', {})}")
                    failed_hosts.extend(batch)

                # 避免请求过快
                time.sleep(1)

            except Exception as e:
                print(f"请求异常: {e}")
                failed_hosts.extend(batch)

        return added_hosts, failed_hosts

    def generate_hosts_from_network(self, network_prefix, start_ip, end_ip, 
                                   hostname_prefix, group_ids, template_ids):
        """根据IP段生成主机配置"""
        hosts = []

        # 将IP地址转换为数字(简化处理)
        def ip_to_num(ip):
            parts = list(map(int, ip.split('.')))
            return parts[0] * 256**3 + parts[1] * 256**2 + parts[2] * 256 + parts[3]

        def num_to_ip(num):
            return f"{num >> 24 & 255}.{num >> 16 & 255}.{num >> 8 & 255}.{num & 255}"

        start_num = ip_to_num(f"{network_prefix}.{start_ip}")
        end_num = ip_to_num(f"{network_prefix}.{end_ip}")

        for i in range(start_num, end_num + 1):
            ip = num_to_ip(i)
            hostname = f"{hostname_prefix}{ip.replace('.', '-')}"

            hosts.append({
                "hostname": hostname,
                "ip": ip,
                "group_ids": group_ids,
                "template_ids": template_ids,
                "description": f"Auto-added host - {ip}"
            })

        return hosts

def main():
    # 配置信息
    ZABBIX_URL = "http://your-zabbix-server/zabbix/api_jsonrpc.php"
    USERNAME = "Admin"
    PASSWORD = "zabbix"

    # 初始化
    zbx = ZabbixBatchManager(ZABBIX_URL, USERNAME, PASSWORD)

    # 方法1:从CSV文件导入
    # csv_file = "hosts.csv"
    # hosts_data = []
    # with open(csv_file, 'r') as f:
    #     reader = csv.DictReader(f)
    #     for row in reader:
    #         hosts_data.append(row)

    # 方法2:自动生成IP段主机
    group_id = zbx.get_group_id("Linux servers")
    template_id = zbx.get_template_id("Template OS Linux")

    # 生成192.168.1.1 - 192.168.1.100范围内的主机
    hosts_data = zbx.generate_hosts_from_network(
        network_prefix="192.168.1",
        start_ip="1",
        end_ip="100",
        hostname_prefix="server-",
        group_ids=[group_id] if group_id else [],
        template_ids=[template_id] if template_id else []
    )

    # 批量添加主机
    added, failed = zbx.add_hosts_batch(hosts_data, batch_size=50)

    print(f"\n批量添加完成!")
    print(f"成功添加: {len(added)} 台")
    print(f"失败: {len(failed)} 台")

    if failed:
        print("\n失败的主机列表:")
        for host in failed:
            print(f"  - {host.get('hostname')} ({host.get('ip')})")

if __name__ == "__main__":
    main()

三、CSV文件格式示例

hostname,ip,groupid,templateid,location,site
server-01,192.168.1.101,2,10001,Beijing,DC1
server-02,192.168.1.102,2,10001,Shanghai,DC2
server-03,192.168.1.103,3,10002,Guangzhou,DC3

四、使用注意事项

1. 性能优化建议

# 启用连接池
session = requests.Session()

# 设置超时时间
requests.post(url, timeout=(30, 60))

# 分批处理,控制并发
batch_size = 50  # 根据服务器性能调整

# 添加延时避免请求过载
time.sleep(0.5)

2. 错误处理增强

def safe_add_host(host_config, max_retries=3):
    """带重试机制的主机添加"""
    for attempt in range(max_retries):
        try:
            response = requests.post(zabbix_url, json=host_config, timeout=30)
            if response.status_code == 200:
                return response.json()
        except requests.exceptions.RequestException as e:
            print(f"尝试 {attempt+1} 失败: {e}")
            time.sleep(2 ** attempt)  # 指数退避

    return None

3. 批量操作前检查

def check_host_existence(hostnames):
    """检查主机是否已存在"""
    data = {
        "jsonrpc": "2.0",
        "method": "host.get",
        "params": {
            "output": ["host"],
            "filter": {"host": hostnames}
        },
        "auth": auth_token,
        "id": 100
    }

    response = requests.post(zabbix_url, json=data)
    existing_hosts = [h["host"] for h in response.json().get("result", [])]
    return existing_hosts

五、高级功能扩展

1. 添加主机的同时设置宏变量

host_config = {
    "host": "server-01",
    "interfaces": [...],
    "macros": [
        {"macro": "{$SNMP_COMMUNITY}", "value": "public"},
        {"macro": "{$CPU_THRESHOLD}", "value": "90"},
        {"macro": "{$MEMORY_THRESHOLD}", "value": "85"}
    ]
}

2. 批量更新主机

def update_hosts_bulk(updates):
    """批量更新主机"""
    data = {
        "jsonrpc": "2.0",
        "method": "host.update",
        "params": updates,
        "auth": auth_token,
        "id": 1
    }

    response = requests.post(zabbix_url, json=data)
    return response.json()

六、执行步骤总结

准备主机清单:整理所有要添加的主机信息 获取认证token:使用API登录获取session 准备主机配置:设置组、模板、接口等参数 分批执行:每批50-100台,避免过大请求 验证结果:检查添加成功的主机数量 处理失败项:记录并重试失败的主机

这个方案可以高效处理数百台主机的批量添加,建议先在测试环境验证后再在生产环境使用。

相关推荐