#
# Imports
#
import requests
import json
import os
#
# The Main Function
#
[docs]def main():
"""
Main function for this library
"""
print("This is a library and not a script. It cannot be run as a script.")
pass
#
# Static Functions
#
[docs]def buildapiurl(proto="http", host="127.0.0.1", port=8000,
action=None, APIPY=False):
"""
Create a URL for the Cuckoo API
:param proto: http or https
:param host: Hostname or IP address
:param port: The port of the Cuckoo API server
:param action: The action to perform with the API
:returns: The URL
"""
if action is None:
return None
else:
if APIPY is True:
return "{0}://{1}:{2}{3}".format(proto, host, port, action)
else:
return "{0}://{1}:{2}/api{3}/".format(proto, host, port, action)
#
# Classes
#
[docs]class CuckooAPI(object):
"""
Class to hold Cuckoo API data.
"""
def __init__(self, host="127.0.0.1", port=8000, proto="http",
APIPY=False):
"""
:param host: Hostname or IP address of Cuckoo server
:param port: The port of the Cuckoo server
:param proto: http or https
:param APIPY: Set to true to submit to api.py on the server
"""
self.proto = proto
self.host = host
self.port = port
self.APIPY = APIPY
[docs] def getcuckoostatus(self):
"""
Function to get the status of the Cuckoo instance.
:returns: Returns the status as a dictionary.
"""
# Build the URL
apiurl = buildapiurl(self.proto, self.host, self.port,
"/cuckoo/status", self.APIPY)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def listmachines(self):
"""
Lists the machines available for analysis.
:returns: Returns the list of machines as a list.
"""
# Build the URL
apiurl = buildapiurl(self.proto, self.host, self.port,
"/machines/list", self.APIPY)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def viewmachine(self, vmname=None):
"""
Lists the details about an analysis machine.
:param vmname: The vm name for the machine to be listed
:returns: Returns the dictionary of the machine specifics
"""
# Build the URL
if vmname is None:
raise CuckooAPINoVM(vmname)
apiurl = buildapiurl(self.proto, self.host, self.port,
"/machines/view/"+vmname, self.APIPY)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def taskslist(self, limit=None, offset=None):
"""
Lists the tasks in the Cuckoo DB.
:param limit: Limit to this many results (Optional)
:param offset: Offset the output to offset in the total task list
and requires limit above. (Optional)
:returns: Returns a list of task details.
"""
# Build the URL
baseurl = "/tasks/list"
if limit is not None:
baseurl = baseurl+"/"+str(limit)
if offset is not None:
baseurl = baseurl+"/"+str(offset)
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl, self.APIPY)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def taskview(self, taskid=None):
"""
View the task for the task ID.
:param taskid: The ID of the task to view.
:returns: Returns a dict of task details.
"""
# Build the URL
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
apiurl = buildapiurl(self.proto, self.host, self.port,
"/tasks/view/"+str(taskid), self.APIPY)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def taskstatus(self, taskid=None):
"""
View the task status for the task ID.
:param taskid: The ID of the task to view.
:returns: Returns a dict of task details.
"""
# Build the URL
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
apiurl = buildapiurl(self.proto, self.host, self.port,
"/tasks/status/"+str(taskid), self.APIPY)
# Not available with APIPY
if self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def taskiocs(self, taskid=None, detailed=False):
"""
View the task IOCs for the task ID.
:param taskid: The ID of the task to view.
:param detailed: Set to true for detailed IOCs.
:returns: Returns a dict of task details.
"""
# Build the URL
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
baseurl = "/tasks/get/iocs/"+str(taskid)
if detailed is True:
baseurl = baseurl + "/detailed"
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl, self.APIPY)
# Not available with APIPY
if self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def taskreport(self, taskid=None, reportformat="json"):
"""
View the report for the task ID.
:param taskid: The ID of the task to report.
:param reportformat: Right now only json is supported.
:returns: Returns a dict report for the task.
"""
# Build the URL
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
if self.APIPY is True:
apiurl = buildapiurl(self.proto, self.host, self.port,
"/tasks/report/"+str(taskid)+"/"+reportformat,
self.APIPY)
else:
apiurl = buildapiurl(self.proto, self.host, self.port,
"/tasks/get/report/"+str(taskid)+"/" +
reportformat, self.APIPY)
# Error on any other format for now...
if reportformat != "json":
raise CuckooAPINotImplemented(apiurl)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def taskdelete(self, taskid=None):
"""
Delete a task.
:param taskid: The task ID to delete.
:returns: Status
"""
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
apiurl = buildapiurl(self.proto, self.host, self.port,
"/tasks/delete/"+str(taskid),
self.APIPY)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
elif request.status_code == 404:
raise CuckooAPINoTaskID(taskid)
elif request.status_code == 500:
raise CuckooAPITaskNoDelete(taskid)
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def taskscreenshots(self, taskid=None, filepath=None, screenshot=None):
"""
Download screenshot(s).
:param taskid: The task ID for the screenshot(s).
:param filepath: Where to save the screenshot(s).
If you are using the Django web api the screenshots
are saved as .tar.bz!
If you are using the api.py script the screenshots are in .zip
format.
This function adds the apppropriate file extensions to the
filepath variable.
:param screenshot: The screenshot number to download.
:returns: Nothing
"""
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
if filepath is None or os.path.exists(filepath):
raise CuckooAPIFileExists(filepath)
if self.APIPY is True:
filepath = filepath+".zip"
else:
filepath = filepath+".tar.bz"
if self.APIPY is True:
baseurl = "/tasks/screenshots/"+str(taskid)
if screenshot is not None:
baseurl = baseurl+"/"+str(screenshot)
else:
baseurl = "/tasks/get/screenshot/"+str(taskid)
if screenshot is not None:
baseurl = baseurl+"/"+str(screenshot)
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl,
self.APIPY)
# Turn on stream to download files
request = requests.get(apiurl, stream=True)
# ERROR CHECK request.status_code!
if request.status_code == 200:
with open(filepath, 'wb') as f:
# Read and write in chunks
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def submitfile(self, filepath, data=None):
"""
Function to submit a local file to Cuckoo for analysis.
:param filepath: Path to a file to submit.
:param data: This is data containing any other options for the
submission form. This is a dict of values accepted by the
create file options in the cuckoo-modified API. More form
information can be found in the following link:
https://github.com/spender-sandbox/cuckoo-modified/blob/master/docs/book/src/usage/api.rst
:returns: Returns the json results of the submission
"""
# Error if the file does not exist
if (filepath is None or not os.path.exists(filepath) or
not os.path.isfile(filepath)):
raise CuckooAPIInvalidFileException(filepath)
# Build the URL
apiurl = buildapiurl(self.proto, self.host, self.port,
"/tasks/create/file", self.APIPY)
with open(filepath, "rb") as sample:
# multipart_file = {"file": ("temp_file_name", sample)}
multipart_file = {"file": (os.path.basename(filepath), sample)}
request = requests.post(apiurl, files=multipart_file, data=data)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def submiturl(self, url, data=None):
"""
Function to submit a URL to Cuckoo for analysis.
:param url: URL to submit.
:param data: This is data containing any other options for the
submission form. This is a dict of values accepted by the
create file options in the cuckoo-modified API. More form
information can be found in the following link:
https://github.com/spender-sandbox/cuckoo-modified/blob/master/docs/book/src/usage/api.rst
:returns: Returns the json results of the submission
"""
# Build the URL
apiurl = buildapiurl(self.proto, self.host, self.port,
"/tasks/create/url", self.APIPY)
multipart_url = {"url": ("", url)}
request = requests.post(apiurl, files=multipart_url, data=data)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def tasksearch(self, hashid=None, hashtype=None):
"""
View information about a specific task by hash.
:param hashid: MD5, SHA1, or SHA256 hash to search.
:param hashtype: 'md5', 'sha1', or 'sha256'
:returns: Returns a dict with results.
"""
if hashid is None:
raise CuckooAPINoHash(hashid, hashtype)
# Build the URL
apiurl = buildapiurl(self.proto, self.host, self.port,
"/files/view/"+hashtype+"/"+hashid,
self.APIPY)
# This appears to be unavailable in the documentation...
if self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def fileview(self, hashid=None, hashtype=None):
"""
View the details for the file given the hash.
:param hashid: The hash or task ID to search.
:param hashtype: The following types of hash:
'taskid', 'md5', 'sha256'. Any other values will cause
an error!
:returns: Returns the results of the file in a dict.
"""
if hashid is None:
raise CuckooAPINoHash(hashid, hashtype)
# Get rid of ints
hashid = str(hashid)
# Build the URL
apiurl = buildapiurl(self.proto, self.host, self.port,
"/files/view/"+hashtype+"/"+hashid,
self.APIPY)
# This appears to be unavailable in the documentation...
if self.APIPY is True and hashtype == "sha1":
raise CuckooAPINotAvailable(apiurl)
if hashtype != "md5" and hashtype != "sha1" and hashtype != "sha256":
raise CuckooAPINotAvailable(apiurl)
request = requests.get(apiurl)
# ERROR CHECK request.status_code!
if request.status_code == 200:
jsonreply = json.loads(request.text)
return jsonreply
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def sampledownload(self, hashid=None, hashtype=None,
filepath=None):
"""
Download a file by hash.
:param hashid: The hash used to download the sample.
:param hashtype: The hash type, can be "task", "md5", sha1",
or "sha256". "task" means the task ID.
:returns: Nothing
"""
# Get rid of ints
hashid = str(hashid)
if hashid is None or hashtype is None:
raise CuckooAPINoHash(hashid, hashtype)
if filepath is None or os.path.exists(filepath):
raise CuckooAPIFileExists(filepath)
if self.APIPY is True:
baseurl = "/files/get/"+hashid
else:
baseurl = "/files/get/"+hashtype+"/"+hashid
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl,
self.APIPY)
if hashtype != "sha256" and self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
# Turn on stream to download files
request = requests.get(apiurl, stream=True)
# ERROR CHECK request.status_code!
if request.status_code == 200:
with open(filepath, 'wb') as f:
# Read and write in chunks
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def pcapdownload(self, taskid=None, filepath=None):
"""
Download a pcap by task ID.
:param taskid: The task ID to download the pcap.
:returns: Nothing
"""
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
if filepath is None or os.path.exists(filepath):
raise CuckooAPIFileExists(filepath)
if self.APIPY is True:
baseurl = "/pcap/get/"+str(taskid)
else:
baseurl = "/tasks/get/pcap/"+str(taskid)
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl,
self.APIPY)
# Turn on stream to download files
request = requests.get(apiurl, stream=True)
# ERROR CHECK request.status_code!
if request.status_code == 200:
with open(filepath, 'wb') as f:
# Read and write in chunks
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def droppeddownload(self, taskid=None, filepath=None):
"""
Download files dropped by sample identified by task ID.
:param taskid: The task ID of the sample.
:param filepath: The file path of the file to create/download.
:returns: Nothing
"""
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
if filepath is None or os.path.exists(filepath):
raise CuckooAPIFileExists(filepath)
baseurl = "/tasks/get/dropped/"+str(taskid)
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl,
self.APIPY)
if self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
# Turn on stream to download files
request = requests.get(apiurl, stream=True)
# ERROR CHECK request.status_code!
if request.status_code == 200:
with open(filepath, 'wb') as f:
# Read and write in chunks
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def surifilesdownload(self, taskid=None, filepath=None):
"""
Download SuriFiles for the sample identified by task ID.
:param taskid: The task ID of the sample.
:param filepath: The file path of the file to create/download.
:returns: Nothing
"""
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
if filepath is None or os.path.exists(filepath):
raise CuckooAPIFileExists(filepath)
baseurl = "/tasks/get/surifile/"+str(taskid)
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl,
self.APIPY)
if self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
# Turn on stream to download files
request = requests.get(apiurl, stream=True)
# ERROR CHECK request.status_code!
if request.status_code == 200:
with open(filepath, 'wb') as f:
# Read and write in chunks
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def fullmemdownload(self, taskid=None, filepath=None):
"""
Download SuriFiles for the sample identified by task ID.
:param taskid: The task ID of the sample.
:param filepath: The file path of the file to create/download.
:returns: Nothing
"""
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
if filepath is None or os.path.exists(filepath):
raise CuckooAPIFileExists(filepath)
baseurl = "/tasks/get/fullmemory/"+str(taskid)
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl,
self.APIPY)
if self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
# Turn on stream to download files
request = requests.get(apiurl, stream=True)
# ERROR CHECK request.status_code!
if request.status_code == 200:
with open(filepath, 'wb') as f:
# Read and write in chunks
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
else:
raise CuckooAPIBadRequest(apiurl)
[docs] def procmemdownload(self, taskid=None, filepath=None, pid=None):
"""
Download SuriFiles for the sample identified by task ID.
:param taskid: The task ID of the sample.
:param filepath: The file path of the file to create/download.
:param pid: Process ID to download
:returns: Nothing
"""
if taskid is None or taskid < 1:
raise CuckooAPINoTaskID(taskid)
if filepath is None or os.path.exists(filepath):
raise CuckooAPIFileExists(filepath)
baseurl = "/tasks/get/procmemory/"+str(taskid)
if pid is not None:
baseurl = baseurl+"/"+str(pid)
apiurl = buildapiurl(self.proto, self.host, self.port,
baseurl,
self.APIPY)
if self.APIPY is True:
raise CuckooAPINotAvailable(apiurl)
# Turn on stream to download files
request = requests.get(apiurl, stream=True)
# ERROR CHECK request.status_code!
if request.status_code == 200:
with open(filepath, 'wb') as f:
# Read and write in chunks
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
else:
raise CuckooAPIBadRequest(apiurl)
[docs]class CuckooAPIInvalidFileException(Exception):
"""
Exception for when a file is not found.
"""
def __init__(self, filepath):
Exception.__init__(self, "CuckooAPI: Invalid File {0}".format(
filepath))
[docs]class CuckooAPINotImplemented(Exception):
"""
Exception for when a call is not implemented, but is available.
"""
def __init__(self, apicall):
Exception.__init__(self,
"CuckooAPI: Not Implemented {}".format(apicall))
[docs]class CuckooAPINotAvailable(Exception):
"""
Exception for when a call is not available on the remote server.
This signifies you may have used an API call meant for the Django
interface and sent it to the api.py interface, or vice versa.
"""
def __init__(self, apicall):
Exception.__init__(self, "CuckooAPI: This API is not available for "
"your target Cuckoo server. Are you mixing "
"calls from Django web interface with the "
"api.py interface? Or the other way around?")
[docs]class CuckooAPIBadRequest(Exception):
"""
Exception for when a Cuckoo machine is not found.
"""
def __init__(self, apiurl):
Exception.__init__(self, "CuckooAPI: Unable to connect "
"with URL {0} Are you mixing "
"calls from Django web interface with the "
"api.py interface? Or the other way "
"around?".format(apiurl))
[docs]class CuckooAPINoVM(Exception):
"""
Exception for when a vm is not found.
"""
def __init__(self, vmname):
Exception.__init__(self, "CuckooAPI: VM {0} not available or invalid!"
.format(vmname))
[docs]class CuckooAPINoTaskID(Exception):
"""
Exception for when an invalid task ID is used.
"""
def __init__(self, taskid):
Exception.__init__(self, "CuckooAPI: Task ID {0} not avilable or "
"invalid!".format(taskid))
[docs]class CuckooAPITaskNoDelete(Exception):
"""
Exception for when a task cannot be deleted.
"""
def __init__(self, taskid):
Exception.__init__(self, "CuckooAPI: Task ID {0} cannot be "
"deleted!".format(taskid))
[docs]class CuckooAPINoHash(Exception):
"""
Exception for when an invalid file hash is used.
"""
def __init__(self, hashid, hashtype):
Exception.__init__(self, "CuckooAPI: Hash {0} of type {1} not "
"available or invalid!".format(hashid, hashtype))
[docs]class CuckooAPIFileExists(Exception):
"""
Exception for when a file is about to be saved over an existing file
or the file name is invalid.
"""
def __init__(self, filepath):
Exception.__init__(self, "CuckooAPI: {0} already exists or "
"is invalid!".format(filepath))
#
# Call main if run as a script
#
if __name__ == '__main__':
main()