summaryrefslogtreecommitdiff
path: root/test/test_downloader_http.py
blob: 7504722810b4e706f6b1143c7a36208ee0478749 (plain)
    1 #!/usr/bin/env python
    2 # coding: utf-8
    3 from __future__ import unicode_literals
    4 
    5 # Allow direct execution
    6 import os
    7 import re
    8 import sys
    9 import unittest
   10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
   11 
   12 from test.helper import http_server_port, try_rm
   13 from youtube_dl import YoutubeDL
   14 from youtube_dl.compat import compat_http_server
   15 from youtube_dl.downloader.http import HttpFD
   16 from youtube_dl.utils import encodeFilename
   17 import threading
   18 
   19 TEST_DIR = os.path.dirname(os.path.abspath(__file__))
   20 
   21 
   22 TEST_SIZE = 10 * 1024
   23 
   24 
   25 class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
   26     def log_message(self, format, *args):
   27         pass
   28 
   29     def send_content_range(self, total=None):
   30         range_header = self.headers.get('Range')
   31         start = end = None
   32         if range_header:
   33             mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header)
   34             if mobj:
   35                 start = int(mobj.group(1))
   36                 end = int(mobj.group(2))
   37         valid_range = start is not None and end is not None
   38         if valid_range:
   39             content_range = 'bytes %d-%d' % (start, end)
   40             if total:
   41                 content_range += '/%d' % total
   42             self.send_header('Content-Range', content_range)
   43         return (end - start + 1) if valid_range else total
   44 
   45     def serve(self, range=True, content_length=True):
   46         self.send_response(200)
   47         self.send_header('Content-Type', 'video/mp4')
   48         size = TEST_SIZE
   49         if range:
   50             size = self.send_content_range(TEST_SIZE)
   51         if content_length:
   52             self.send_header('Content-Length', size)
   53         self.end_headers()
   54         self.wfile.write(b'#' * size)
   55 
   56     def do_GET(self):
   57         if self.path == '/regular':
   58             self.serve()
   59         elif self.path == '/no-content-length':
   60             self.serve(content_length=False)
   61         elif self.path == '/no-range':
   62             self.serve(range=False)
   63         elif self.path == '/no-range-no-content-length':
   64             self.serve(range=False, content_length=False)
   65         else:
   66             assert False
   67 
   68 
   69 class FakeLogger(object):
   70     def debug(self, msg):
   71         pass
   72 
   73     def warning(self, msg):
   74         pass
   75 
   76     def error(self, msg):
   77         pass
   78 
   79 
   80 class TestHttpFD(unittest.TestCase):
   81     def setUp(self):
   82         self.httpd = compat_http_server.HTTPServer(
   83             ('127.0.0.1', 0), HTTPTestRequestHandler)
   84         self.port = http_server_port(self.httpd)
   85         self.server_thread = threading.Thread(target=self.httpd.serve_forever)
   86         self.server_thread.daemon = True
   87         self.server_thread.start()
   88 
   89     def download(self, params, ep):
   90         params['logger'] = FakeLogger()
   91         ydl = YoutubeDL(params)
   92         downloader = HttpFD(ydl, params)
   93         filename = 'testfile.mp4'
   94         try_rm(encodeFilename(filename))
   95         self.assertTrue(downloader.real_download(filename, {
   96             'url': 'http://127.0.0.1:%d/%s' % (self.port, ep),
   97         }))
   98         self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
   99         try_rm(encodeFilename(filename))
  100 
  101     def download_all(self, params):
  102         for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
  103             self.download(params, ep)
  104 
  105     def test_regular(self):
  106         self.download_all({})
  107 
  108     def test_chunked(self):
  109         self.download_all({
  110             'http_chunk_size': 1000,
  111         })
  112 
  113 
  114 if __name__ == '__main__':
  115     unittest.main()

Generated by cgit