728x90
개요
업무중에 단축 URL 이나 URL을 많이 봐야 하는데, 사람이 일일이 보는게 너무 힘들다고 느껴져서 만들어 볼까 하고 시작함. 단축 기준 접속시 최종으로 접속되는 URL이 필요하고, 해당 URL에 도메인, 접속 되는지에 대한 상태코드 정도 필요 하다 싶어서 해당 기준으로 잡고 시작함.
ShortURL_Redirect
비번 : cago
입력값은 txt 파일로 받아 출력 값으로는 입력 URL, 리다이렉트 URL, 리다이렉트 도메인, 리다이렉트 도메인 상태코드
리다이렉트 도메인 값은 입력 URL 과 리다이렉트 URL이 다를때만 나오게 된다.
실행 순서
1. input.txt 에 확인 할 URL 넣는다
2. ShortURL_Redirect_flow.exe 실행
3. CSV 파일 떨어지면 확인
2024/01/18 병렬처리 방식으로 바꿈
2024/01/19 테스트 500개 돌리면 메모리 99MB 정도 사용 쓰레드 수 2~3배정도 늘려도 될꺼 같음. 회사에서 동작안됨....
2024/01/21 회사에서 동작안되는거는 쓰레드가 멈춤 현상이 있음
2024/01/22 retrun 말고 raise 이용하니 쓰레드 멈추는거 해결 하긴함. 하지만 접속은 되는데 HTTPConnectionPool 에러 뜨는 현상 발견..
더보기
#-*- coding: utf-8 -*-
import requests
import csv
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 4.4.2; Nexus 4 Build/KOT49H) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.114 Mobile Safari/537.36'}
def get_final_redirected_url(url):
url_in = url
try:
with requests.Session() as session:
session.headers.update(headers)
response = session.get(url, allow_redirects=False, verify=False, timeout=10)
# 최종 리다이렉트된 URL 가져오기
while response.headers.get('Location'):
if("." in response.headers.get('Location')):
url = response.headers['Location']
response = session.get(url, allow_redirects=False,verify=False, timeout=10, )
else: return url
return url
except requests.exceptions.Timeout as errc:
domain_c = domain_cutting(url_in, url)
if("wsgadd.kt.com") in url:
state_code = "KT 스미싱"
Csv_Writer(url_in, url, domain_c, state_code)
raise
Csv_Writer(url_in, url, domain_c, "확인")
raise
except requests.RequestException as e:
return url
def domain_cutting(final_redirected_url:str, original_url:str):
final_redirected_url = final_redirected_url.replace('https://', '').replace('http://', '')
original_url = original_url.replace('https://', '').replace('http://', '')
final_redirected_url2 = final_redirected_url.split('/')
original_url2 = original_url.split('/')
if(final_redirected_url2[0] == original_url2[0]):
return None
else:
final_domain_url = final_redirected_url2[0]
return final_domain_url
def check_status_code(final_redirected_url):
try:
with requests.Session() as session:
session.headers.update(headers)
response = session.get(final_redirected_url)
response.raise_for_status() # 이 부분에서 상태 코드를 체크하여 에러가 있으면 예외 발생
return response.status_code
except requests.exceptions.HTTPError as errh:
return("HTTP error")
except requests.exceptions.ConnectionError as errc:
return("error Connecting")
except requests.exceptions.Timeout as errt:
return("Timeout error:")
except requests.exceptions.RequestException as err:
return ("Request error")
def Csv_Writer(original_url, final_redirected_url, final_domain, final_status_code):
with csv_lock:
with open(output_filename, 'a', newline='') as outfile:
csv_writer = csv.writer(outfile)
csv_writer.writerow([original_url, final_redirected_url, final_domain, final_status_code])
def process_url(original_url:str):
if "https://" in original_url:
pass
else:
original_url = original_url.replace("http://", "https://")
try:
final_redirected_url = get_final_redirected_url(original_url)
final_status_code = check_status_code(final_redirected_url)
final_domain = domain_cutting(final_redirected_url, original_url)
Csv_Writer(original_url, final_redirected_url, final_domain, final_status_code)
except Exception as e:
Csv_Writer(original_url, url, final_domain, final_status_code)
print(f"An error occurred for URL {original_url}: {e}")
raise
if __name__ == "__main__":
input_filename = "input.txt" # 입력 파일명을 적절히 수정
output_filename = "output.csv" # 출력 파일명을 적절히 수정
csv_lock = threading.Lock()
count_time = time.time()
with open(output_filename, 'w', newline='',encoding="utf-8") as outfile:
csv_writer = csv.writer(outfile)
csv_writer.writerow(['Original URL', 'Final Redirected URL', 'Final_domain', 'Fianl_status_code']) # CSV 헤더 작성
# 세션 객체 생성 및 헤더 설정
with open(input_filename, 'r', encoding="utf-8") as infile, ThreadPoolExecutor(max_workers=200) as executor:
futures = {executor.submit(process_url, line.strip()): line for line in infile}
for future in concurrent.futures.as_completed(futures):
try:
url = futures[future]
future.result()
except Exception as e:
print(f"An error occurred for URL {url}: {e}")
print(time.time() - count_time)
print(f"Processing completed. Output saved to {output_filename}")
728x90
'Basic > Python' 카테고리의 다른 글
pobfs to dex 이름 바꾸기 (0) | 2023.11.07 |
---|---|
[Python] 딕셔너리(Dictionary) 문제 풀어보기 (0) | 2021.06.28 |
[Python] 튜플 문제 풀어 보기 (0) | 2021.06.22 |
[Python] 리스트 문제 풀기 (0) | 2021.06.18 |
[Python] 문자열 연습 문제 풀기 (0) | 2021.06.02 |