jeddhor jeddhor - 3 months ago 9
Python Question

Create project using Fortify Software Security Center REST API

Can I create projects using the HP Fortify SSC's REST API? This works beautifully to grab a list of projects:

import requests
import getpass
import json

url = "https://www.example.com/ssc/api/v1/"
endpoint = "auth/obtain_token"
headers = {"Content-Type": "application/json",
"Accept": "application/json"}
username = getpass.getuser()
password = getpass.getpass()
auth = (username, password)

r = requests.post("{url}{endpoint}".format(url=url, endpoint=endpoint), headers=headers, auth=auth)

data = r.json().get("data")
token = data.get("token")
endpoint = "projects"
headers["Authentication"] = "FortifyToken {token}".format(token=token)

r = requests.get("{url}{endpoint}".format(url=url, endpoint=endpoint), headers=headers)

print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))


... But I don't see any examples in the API documentation for actually creating a project. I am specifically interested in doing so using the Python requests library. I do NOT want to have to do anything in Java (which is what all of the samples that come with the Fortify SSC WAR package are).

If anyone has any experience with the SSC REST API (or can point me to some better documentation), I'd appreciate any help you can give.

Answer

I finally got some good information out of HPE technical support, and was able to put together this script for creating projects using the SSC REST API in Python. I'm totally an amateur, so forgive me if this is terribly ugly (i.e., I know my stupid print_debug mess can be replaced by using the logging module), but since other people are looking at this question I figured it would be nice of me to post it. Feel free to use this in any way you wish.

# Project creation script for creating a project in the HP Software Security Center

import json
import optparse
import sys
import urllib2
from base64 import b64encode

# Define some constants
# I hate doing this but it makes things a bit easier down the road...
# i.e. I don't have to create a class just to not pass arguments around

SSC_URL = "Insert your default SSC URL here; alternatively pass it on the command line"
SSC_USER = ""
SSC_PASS = ""
TOKEN = ""
DEFAULT_USERS = ["List of default users who have access to the project; can be passed on the command line"]
DEBUG = False
QUIET = False

# Debug level constants
DBG_DEBUG = 1
DBG_INFO = 2
DBG_WARN = 3
DBG_ERROR = 4


def print_debug(msg, level=DBG_DEBUG):
    """A simple little function for printing more verbose output."""
    if not QUIET:
        # Only print DEBUG level messages if debug is on
        if level == DBG_DEBUG:
            if DEBUG:
                print "[debug]:", msg
        # We always print INFO, WARN, and ERROR messages
        elif level == DBG_INFO:
            print "[info]:", msg
        elif level == DBG_WARN:
            print "[warning]:", msg
        elif level == DBG_ERROR:
            print "[error]:", msg


def get_args(argv):
    """Parse command line arguments."""
    # I hate using globals but I hate passing around variables more.
    global SSC_URL, SSC_USER, SSC_PASS, TOKEN, DEBUG, QUIET
    print_debug("Parsing command line arguments...")
    # I'm using optparse here for compatibility with the really old version of Python on our servers.
    parser = optparse.OptionParser(description="This program creates a new project, version, or both, in the HP Software Security Center (SSC).",
                                   usage="usage: %prog [--project PROJECT] [--version VERSION] [--users USERS]\n       [--token TOKEN] [--debug] [--quiet]")
    parser.add_option("-v", "--version", help="Version name to ensure exists inside the project, enclosed in double-quotes", dest="version")
    parser.add_option("-p", "--project", help="Project name to ensure exists, enclosed in double-quotes", dest="project")
    parser.add_option("--sscuser", help=optparse.SUPPRESS_HELP, dest="sscuser")
    parser.add_option("--sscpass", help=optparse.SUPPRESS_HELP, dest="sscpass")
    parser.add_option("-t", "--token", help="Token for issuing commands to the SSC", dest="token")
    parser.add_option("-l", "--users", help="A comma-separated list of users or Active Directory groups who should have access to this project in the SSC, enclosed in double-quotes",
                      dest="users")
    parser.add_option("-f", "--users_file", help="A file containing a list of users and groups, one per line; can use this instead of --users parameter if there are many users", dest="users_file")
    parser.add_option("-u", "--url", help="URL of the SSC", dest="url")
    parser.add_option("-d", "--debug", help="Run in debug mode; produces more verbose output", action="store_true")
    parser.add_option("-q", "--quiet", help="Quiet mode; will not produce any text output, overrides --debug", action="store_true")
    options, args = parser.parse_args(argv)
    SSC_USER = options.sscuser
    SSC_PASS = options.sscpass
    TOKEN = options.token
    DEBUG = options.debug
    QUIET = options.quiet
    if args:
        print_debug("Unrecognized arguments: {args}".format(args=args), level=DBG_ERROR)
        sys.exit(1)
    if not options.token:
        if not options.sscuser or not options.sscpass:
            print_debug("No token specified! -t/--token option is required.", level=DBG_ERROR)
            sys.exit(1)
    if not options.version:
        print_debug("No project version name specified! -v/--version option is required.", level=DBG_ERROR)
        sys.exit(1)
    if not options.project:
        print_debug("No project name specified! -p/--project option is required.", level=DBG_ERROR)
        sys.exit(1)
    if options.url:
        SSC_URL = options.url
    return options


def get_ssc_rest_api_token():
    """Get a token that will be used to make API requests."""
    # If we've given an SSC Username and SSC Password on the command line, use those to generate tokens instead.
    global TOKEN
    if SSC_USER and SSC_PASS:
        headers = {"Content-Type": "application/json", "Accept": "application/json"}
        endpoint = "/api/v1/auth/obtain_token"
        print_debug("Trying to obtain SSC auth token...")
        user_pass = "{user}:{password}".format(user=SSC_USER, password=SSC_PASS)
        b64_user_pass = b64encode(user_pass)
        headers["Authorization"] = "Basic {user_pass}".format(user_pass=str(b64_user_pass))
        req = urllib2.Request("{url}{endpoint}".format(url=SSC_URL, endpoint=endpoint), headers=headers, data=json.dumps({}))
        try:
            r = urllib2.urlopen(req)
        except urllib2.HTTPError as e:
            print_debug("Urllib2 error!", level=DBG_ERROR)
            print_debug("{code} - {reason}".format(code=e.code, reason=e.reason))
            for l in e.read().splitlines():
                print_debug(l, level=DBG_ERROR)
            sys.exit(1)
        try:
            response = json.loads(r.read())
            data = response.get("data")
        except ValueError:
            print_debug("Unable to get JSON data from SSC token request!", level=DBG_ERROR)
            print_debug("Response text:")
            print_debug(r.read())
            sys.exit(1)
        if data:
            token = data.get("token")
        else:
            print_debug("Unable to obtain SSC auth token!", level=DBG_ERROR)
            print_debug("Response text:")
            print_debug(r.read())
            sys.exit(1)
        print_debug("Obtained SSC auth token: {token}".format(token=token))
        return str(token)
    else:
        # Otherwise, just get the base64 encoded version of the one we've specified on the command line.
        print_debug("Trying to obtain SSC auth token...")
        token = b64encode(TOKEN)
        print_debug("Obtained SSC auth token: {token}".format(token=token))
        return str(token)


def get_project_id(project_name):
    """"Gets the ID of the given project."""
    token = get_ssc_rest_api_token()
    print_debug("Searching for project...")
    endpoint = "/api/v1/projects/"
    headers = {"Content-Type": "application/json", "Accept": "application/json", "Authorization": "FortifyToken {0}".format(token)}
    query = '?q=name:"{project_name}"'.format(project_name=project_name)
    req = urllib2.Request("{url}{endpoint}{query}".format(url=SSC_URL, endpoint=endpoint, query=query), headers=headers)
    try:
        r = urllib2.urlopen(req)
    except urllib2.HTTPError as e:
        print_debug("Urllib2 error!", level=DBG_ERROR)
        print_debug("{code} - {reason}".format(code=e.code, reason=e.reason))
        for l in e.read().splitlines():
            print_debug(l, level=DBG_ERROR)
        sys.exit(1)
    response = json.loads(r.read())
    if response.get("responseCode") in range(200, 299):
        data = response["data"]
        if data:
            project_id = data[0].get("id")
            print_debug("Returning ID for project {p}".format(p=data[0].get("name")))
            return project_id
        else:
            return None
    else:
        print_debug("Unexpected HTTP response code: {code}".format(code=json.dumps(response, indent=4, separators=(",", ": "))), level=DBG_ERROR)
        sys.exit(1)


def get_user_ids(users):
    """Gets the SSC user IDs for a list of users."""
    token = get_ssc_rest_api_token()
    endpoint = "/api/v1/authEntities/"
    headers = {"Content-Type": "application/json", "Accept": "application/json", "Authorization": "FortifyToken {0}".format(token)}
    req = urllib2.Request("{url}{endpoint}".format(url=SSC_URL, endpoint=endpoint), headers=headers)
    try:
        r = urllib2.urlopen(req)
    except urllib2.HTTPError as e:
        print_debug("Urllib2 error!", level=DBG_ERROR)
        print_debug("{code} - {reason}".format(code=e.code, reason=e.reason))
        for l in e.read().splitlines():
            print_debug(l, level=DBG_ERROR)
        sys.exit(1)
    response = json.loads(r.read())
    data = response.get("data")
    links = response.get("links")
    if links:
        nextlink = links.get("next")
        while nextlink:
            print_debug("Getting next page of results...")
            href = nextlink.get("href")
            req = urllib2.Request(href, headers=headers)
            try:
                r = urllib2.urlopen(req)
            except urllib2.HTTPError as e:
                print_debug("Urllib2 error!", level=DBG_ERROR)
                print_debug("{code} - {reason}".format(code=e.code, reason=e.reason))
                for l in e.read().splitlines():
                    print_debug(l, level=DBG_ERROR)
                sys.exit(1)
            response = json.loads(r.read())
            if response.get("data"):
                for d in response.get("data"):
                    data.append(d)
            links = response.get("links")
            nextlink = links.get("next")

    user_ids = []

    for user in users:
        print_debug("Getting user ID for: {0}".format(user))
        try:
            user_data = (d for d in data if d.get("entityName") and d.get("entityName") == user).next()
        except StopIteration:
            print_debug("User ID for user {user} not found!".format(user=user), level=DBG_WARN)
            continue
        user_id = user_data["id"]
        is_ldap = user_data["isLdap"]
        display_name = user_data["displayName"]
        user_obj = {"id": user_id, "isLdap": is_ldap, "displayName": display_name}
        user_ids.append(user_obj)
        print_debug("Got user ID for user: {0}".format(user_obj["displayName"]), level=DBG_INFO)

    return user_ids


def main(argv):
    """Creates a new project in the SSC."""
    # Parse our command line arguments
    args = get_args(argv)

    print_debug('Project: "{project}"'.format(project=args.project.encode("utf-8")), level=DBG_INFO)
    print_debug('Version: "{version}"'.format(version=args.version.encode("utf-8")), level=DBG_INFO)

    # If we specified a list of users on the command line, use that
    # Otherwise use our default users list at the top of this script
    users = []
    if args.users:
        users_list = args.users.split(",")
        for u in users_list:
            users.append(u.strip())
    if args.users_file:
        users_list = open(args.users_file, "r").readlines()
        for u in users_list:
            users.append(u.strip())
    if not args.users and not args.users_file:
        print_debug("No users specified on command line; using default list.", level=DBG_WARN)
        users = DEFAULT_USERS

    project_version = args.version
    project_name = args.project

    user_ids = get_user_ids(users)
    if not user_ids:
        print_debug("Users list is empty; unable to create project or version without a list of users!", level=DBG_ERROR)
        sys.exit(1)

    payload = {"name": project_version.decode("utf-8"),
               "description": "",
               "active": "true",
               "committed": "false",
               "project": {"name": project_name.decode("utf-8"),
                           "description": "",
                           "issueTemplateId": "Prioritized-HighRisk-Project-Template"},
               "issueTemplateId": "Prioritized-HighRisk-Project-Template"}

    # Will return a b64encoded string, either generated automatically if a sscuser
    # and sscpass are specified, otherwise using the token passed on the command line
    token = get_ssc_rest_api_token()

    headers = {"Content-Type": "application/json", "Accept": "application/json", "Authorization": "FortifyToken {0}".format(token)}

    print_debug("Submitting request to create project...")

    # If the project already exists, we just need to create a new version in the same project.
    # Otherwise use the project creation endpoint.
    existing_id = get_project_id(project_name)
    if existing_id:
        endpoint = "/api/v1/projects/{project_id}/versions".format(project_id=existing_id)
        payload["project"] = {"id": existing_id}
    else:
        endpoint = "/api/v1/projectVersions"

    req = urllib2.Request("{url}{endpoint}".format(url=SSC_URL, endpoint=endpoint), headers=headers, data=str(json.dumps(payload)))
    try:
        r = urllib2.urlopen(req)
    except urllib2.HTTPError as e:
        if "You have entered a duplicate name" in e.read():
            print_debug("Project and version already exist!", level=DBG_INFO)
            sys.exit(0)
        else:
            print_debug("Urllib2 error!", level=DBG_ERROR)
            print_debug("{code} - {reason}".format(code=e.code, reason=e.reason))
            for l in e.read().splitlines():
                print_debug(l, level=DBG_ERROR)
            sys.exit(1)

    try:
        response = json.loads(r.read())
    except ValueError:
        response = None

    if r.getcode() in range(200, 299):
        print_debug("Request to create project successful!")
        data = response.get("data")
        if data:
            project_id = data.get("id")
            if not project_id:
                print_debug("Error: Unable to get project version ID!", level=DBG_ERROR)
                sys.exit(1)
        else:
            print_debug("Error: Unable to get project version ID data!", level=DBG_ERROR)
            sys.exit(1)

        payload = {"requests": [
            {
                "uri": "{ssc_url}/api/v1/projectVersions/{project_id}/attributes".format(
                    ssc_url=SSC_URL, project_id=project_id),
                "httpVerb": "PUT",
                "postData": [
                    {
                        "attributeDefinitionId": 1,
                        "values": [
                            {
                                "guid": "High"
                            }
                        ],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 5,
                        "values": [
                            {
                                "guid": "Active"
                            }
                        ],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 6,
                        "values": [
                            {
                                "guid": "Internal"
                            }
                        ],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 7,
                        "values": [
                            {
                                "guid": "externalpublicnetwork"
                            }
                        ],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 10,
                        "values": [],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 11,
                        "values": [],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 12,
                        "values": [],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 2,
                        "values": [],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 3,
                        "values": [],
                        "value": "null"
                    },
                    {
                        "attributeDefinitionId": 4,
                        "values": [],
                        "value": "null"
                    }
                ]
            },
            {
                # Assign the user_ids that we specified to this project
                "uri": "{ssc_url}/api/v1/projectVersions/{project_id}/authEntities".format(
                    ssc_url=SSC_URL, project_id=project_id),
                "httpVerb": "PUT",
                "postData": user_ids
            },
            {
                # Set project to use the default analysis processing rules and bug tracker config, along with any custom tags.
                "uri": "{ssc_url}/api/v1/projectVersions/{project_id}/action".format(
                    ssc_url=SSC_URL, project_id=project_id),
                "httpVerb": "POST",
                "postData": [
                    {
                        "type": "COPY_FROM_PARTIAL",
                        "values": {
                            "projectVersionId": project_id,
                            "previousProjectVersionId": -1,
                            "copyAnalysisProcessingRules": "true",
                            "copyBugTrackerConfiguration": "true",
                            "copyCurrentStateFpr": "false",
                            "copyCustomTags": "true"
                        }
                    }
                ]
            },
            {
                # Commits the project/version so the SSC knows it's ready
                "uri": "{ssc_url}/api/v1/projectVersions/{project_id}?hideProgress=true".format(
                    ssc_url=SSC_URL, project_id=project_id),
                "httpVerb": "PUT",
                "postData": {
                    "committed": "true"
                }
            }]
        }
        endpoint = "/api/v1/bulk"
        print_debug("Submitting project metadata to bulk endpoint...")

        req = urllib2.Request("{url}{endpoint}".format(url=SSC_URL, endpoint=endpoint), headers=headers, data=str(json.dumps(payload)))
        try:
            r = urllib2.urlopen(req)
        except urllib2.HTTPError as e:
            print_debug("Urllib2 error!", level=DBG_ERROR)
            print_debug("{code} - {reason}".format(code=e.code, reason=e.reason))
            for l in e.read().splitlines():
                print_debug(l, level=DBG_ERROR)
            sys.exit(1)
        if r.getcode() in range(200, 299):
            print_debug("Project version creation successful!", level=DBG_INFO)
            sys.exit(0)
        else:
            print_debug("Unable to create project version!", level=DBG_ERROR)
            print_debug("Response text:")
            print_debug(r.read())
            sys.exit(1)
    else:
        if "You have entered a duplicate name" in response:
            print_debug("Project and version already exist!", level=DBG_INFO)
            sys.exit(0)
        else:
            print_debug("Unable to create project!", level=DBG_ERROR)
            print_debug("Response text:")
            print_debug(r.read())
            sys.exit(1)


if __name__ == "__main__":
    main(sys.argv[1:])