ietf-clear
[Top] [All Lists]

[clear] pyCSV

2005-08-17 14:48:25
Following is the first draft of my script to check CSV records, and it 
seems to work correctly on the examples in the docstrings.  The script was 
written with Sendmail in mind, but it should interface easily with just 
about any MTA, since the return values are pretty much universal.

Suggestions are welcome, particularly anything I haven't thought of.

--
Dave

# pyCSV.py  8/16/05  David MacQuigg

import DNS  # from http://pydns.sourceforge.net
from DNS import DNSError  # catch-all for DNS errors

##DNS.DiscoverNameServers()  # from /etc/resolv.conf or Windows Registry
##                           # to DNS.defaults
# Put your local nameservers here, if DiscoverNameServers doesn't work.
DNS.defaults['server'] = ['216.183.68.110', '216.183.68.111']   ###

DNS.timeout = 30  # default if not over-ridden by main module

def csv(IP, helo):
     '''
     Runs a CSV test on the helo name.

     Returns ( action, SMTP_reply, header )

       action: 'ACCEPT', 'REJECT', 'TEMPFAIL'

       SMTP_reply = ( SMTP_code, Xcode, explanation )
         SMTP_code: SMTP Reply Code per RFC-2821
         Xcode:  Enhanced Mail System Status Code per RFC-3463

       header = {'label': 'Authent:',
                  'text': '%s %s CSV %s' % (IP, helo, result) }
         result: 'PASS', 'FAIL'

Examples:
csv('168.61.5.27', 'harry.mail-abuse.org')
('ACCEPT', (250, '', 'Sender CSV OK'), \
[{'text': '168.61.5.27 harry.mail-abuse.org CSV PASS', 'label': 'Authent:'}])

csv('192.168.0.64', 'harry.mail-abuse.org')
('REJECT', (550, '5.7.1', \
"'192.168.0.64' not authorized by 'harry.mail-abuse.org'"), [])

csv(IP, 'yahoo.com')
('REJECT', (550, '5.7.1', "No SRV record for '_client._smtp.yahoo.com'."), [])

     '''
     ## Get an SRV record for the helo name:
     name = '_client._smtp.' + helo
     try:
         reqobj = DNS.Request(name, qtype='SRV', timeout=DNS.timeout)
         resp = reqobj.req()

     except DNSError, expln:
         exp = str(expln)
         if exp == 'Timeout':
             msg = ("Timeout getting SRV record for '%s'.\n" % name
                  + "Try again later."  )
             return ('TEMPFAIL', (450, '', msg), [] )
         else:
             msg == exp + "\nDNS error getting SRV record for '%s'" % name
             return ('REJECT', (550, '?.?.?', msg), [] )

     ## Check for too few or too many SRV records:
     lr = len(resp.answers)
     if lr == 0:
         exp = "No SRV record for '%s'." % name   # or non-existent domain
         return ('REJECT', (550, '5.7.1', exp), [])
     if lr  > 1:
         exp = "Found %s SRV records for '%s'. Should be 1." % (lr, name)
         return ('REJECT', (550, '5.7.1', exp), [])

     ## Extract the needed info from the response:
     ra0 = resp.answers[0]
     rad = ra0['data']
     priority = rad[0]  # CSV version
     weight   = rad[1]  # authorization ( 1 = NO, 2 = YES )
     port     = rad[2]  # subdomain authorization
                        # ( 0 = unknown, 1 = CSV required )
     target   = rad[3]  # authorized hostname (ID)

     if weight != 2:
         exp = "'%s' not authorized to send mail" % target
         return ('REJECT', (550, '5.7.1', exp), [])

     ## Check the A records for the authorized name:
     try:
         reqobj = DNS.Request(target, qtype='A', timeout=DNS.timeout)
         resp = reqobj.req()

     except DNSError, expln:
         exp = str(expln)
         if exp == 'Timeout':
             msg = ("Timeout getting A records for '%s'.\n" % target
                  + "Try again later."  )
             return ('TEMPFAIL', (450, '', msg), [] )
         else:
             msg == exp + "\nDNSError getting A records for '%s'" % target
             return ('REJECT', (550, '?.?.?', msg), [] )

     ## Make a list of the authorized IP addresses:
     aa = []
     for ans in resp.answers:
         aa.append(ans['data'])

     ## Check incoming IP against the list:
     if IP in aa:
         action = 'ACCEPT'
         SMTP_reply = (250, '', 'Sender CSV OK')
         header = {'label': 'Authent:',
                     'text': '%s %s CSV PASS' % (IP, helo) }
         return (action, SMTP_reply, [header] )
     else:
         action = 'REJECT'
         SMTP_reply = (550, '5.7.1',
             "'%s' not authorized by '%s'" % (IP, helo) )
         return (action, SMTP_reply, [] )

if __name__ == '__main__':

     IP = '168.61.5.27'
     helo = 'harry.mail-abuse.org'

     import sys, doctest
     doctest.testmod(sys.modules['__main__'], verbose=True)



*************************************************************     *
* David MacQuigg, PhD              * email:  dmq'at'gci-net.com   *  *
* IC Design Engineer               * phone:  USA 520-721-4583  *  *  *
* Analog Design Methodologies                                  *  *  *
*                                  * 9320 East Mikelyn Lane     * * *
* VRS Consulting, P.C.             * Tucson, Arizona 85710        *
*************************************************************     *


<Prev in Thread] Current Thread [Next in Thread>